rds_ib_sysctl_exit();
rds_ib_recv_exit();
rds_trans_unregister(&rds_ib_transport);
+ rds_ib_fmr_exit();
}
struct rds_transport rds_ib_transport = {
INIT_LIST_HEAD(&rds_ib_devices);
- ret = ib_register_client(&rds_ib_client);
+ ret = rds_ib_fmr_init();
if (ret)
goto out;
+ ret = ib_register_client(&rds_ib_client);
+ if (ret)
+ goto out_fmr_exit;
+
ret = rds_ib_sysctl_init();
if (ret)
goto out_ibreg;
- rds_ib_recv_init();
+ ret = rds_ib_recv_init();
+ if (ret)
+ goto out_sysctl;
ret = rds_trans_register(&rds_ib_transport);
if (ret)
out_recv:
rds_ib_recv_exit();
+out_sysctl:
rds_ib_sysctl_exit();
out_ibreg:
rds_ib_unregister_client();
+out_fmr_exit:
+ rds_ib_fmr_exit();
out:
return ret;
}
void rds_ib_sync_mr(void *trans_private, int dir);
void rds_ib_free_mr(void *trans_private, int invalidate);
void rds_ib_flush_mrs(void);
+int __init rds_ib_fmr_init(void);
+void __exit rds_ib_fmr_exit(void);
/* ib_recv.c */
-void __init rds_ib_recv_init(void);
+int __init rds_ib_recv_init(void);
void rds_ib_recv_exit(void);
int rds_ib_recv(struct rds_connection *conn);
int rds_ib_recv_alloc_caches(struct rds_ib_connection *ic);
return ret;
}
+struct workqueue_struct *rds_ib_fmr_wq;
+
+int __init rds_ib_fmr_init(void)
+{
+ rds_ib_fmr_wq = create_workqueue("rds_fmr_flushd");
+ if (!rds_ib_fmr_wq)
+ return -ENOMEM;
+ return 0;
+}
+
+/*
+ * By the time this is called all the IB devices should have been torn down and
+ * had their pools freed. As each pool is freed its work struct is waited on,
+ * so the pool flushing work queue should be idle by the time we get here.
+ */
+void __exit rds_ib_fmr_exit(void)
+{
+ destroy_workqueue(rds_ib_fmr_wq);
+}
+
static void rds_ib_mr_pool_flush_worker(struct work_struct *work)
{
struct rds_ib_mr_pool *pool = container_of(work, struct rds_ib_mr_pool, flush_worker.work);
/* If we've pinned too many pages, request a flush */
if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned
|| atomic_read(&pool->dirty_count) >= pool->max_items / 10)
- queue_delayed_work(rds_wq, &pool->flush_worker, 10);
+ queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
if (invalidate) {
if (likely(!in_interrupt())) {
} else {
/* We get here if the user created a MR marked
* as use_once and invalidate at the same time. */
- queue_delayed_work(rds_wq, &pool->flush_worker, 10);
+ queue_delayed_work(rds_ib_fmr_wq,
+ &pool->flush_worker, 10);
}
}
ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE);
- /*
- * Also process recvs in connecting state because it is possible
- * to get a recv completion _before_ the rdmacm ESTABLISHED
- * event is processed.
- */
- if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
+ if (wc->status == IB_WC_SUCCESS) {
+ rds_ib_process_recv(conn, recv, wc->byte_len, state);
+ } else {
/* We expect errors as the qp is drained during shutdown */
- if (wc->status == IB_WC_SUCCESS) {
- rds_ib_process_recv(conn, recv, wc->byte_len, state);
- } else {
+ if (rds_conn_up(conn) || rds_conn_connecting(conn))
rds_ib_conn_error(conn, "recv completion on "
"%pI4 had status %u, disconnecting and "
"reconnecting\n", &conn->c_faddr,
wc->status);
- }
}
+ /*
+ * It's very important that we only free this ring entry if we've truly
+ * freed the resources allocated to the entry. The refilling path can
+ * leak if we don't.
+ */
rds_ib_ring_free(&ic->i_recv_ring, 1);
/* If we ever end up with a really empty receive ring, we're
return ret;
}
-void __init rds_ib_recv_init(void)
+int __init rds_ib_recv_init(void)
{
struct sysinfo si;
rds_ib_incoming_slab = kmem_cache_create("rds_ib_incoming",
sizeof(struct rds_ib_incoming),
- SLAB_HWCACHE_ALIGN|SLAB_PANIC, 0, NULL);
+ SLAB_HWCACHE_ALIGN, 0, NULL);
+ if (!rds_ib_incoming_slab)
+ return -ENOMEM;
+
rds_ib_frag_slab = kmem_cache_create("rds_ib_frag",
sizeof(struct rds_page_frag),
- SLAB_HWCACHE_ALIGN|SLAB_PANIC, 0, NULL);
+ SLAB_HWCACHE_ALIGN, 0, NULL);
+ if (!rds_ib_frag_slab) {
+ kmem_cache_destroy(rds_ib_incoming_slab);
+ rds_ib_incoming_slab = NULL;
+ return -ENOMEM;
+ }
+ return 0;
}
void rds_ib_recv_exit(void)
int __init rds_threads_init(void)
{
- rds_wq = create_workqueue("krdsd");
+ rds_wq = create_singlethread_workqueue("krdsd");
if (!rds_wq)
return -ENOMEM;