From f941adcd9ce5157b89557ae08e3225c2ad73eb8f Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Tue, 21 Apr 2015 18:27:39 -0400 Subject: [PATCH] xprtrdma: Wait before destroying transport's queue pair [ Proposed for upstream v4.4 ] Ensure that resources needed to handle completions remain in place until all pending completions have been processed. This solution is based on commit 7dad6b2e440d ("IB/srp: Fix a race condition triggered by destroying a queue pair"). Note that when shutting down a transport, destroying the receive buffers must be done _after_ the receive buffers are flushed. Also note that during a transport reconnect, any waiting RPC can exit (eg soft timeout). With FRWR, that can result in a posted send WR after the sentinel. That will be addressed eventually. Right now there are no dependencies on this (mis)behavior. Signed-off-by: Chuck Lever --- net/sunrpc/xprtrdma/transport.c | 6 +-- net/sunrpc/xprtrdma/verbs.c | 95 ++++++++++++++++++++++++++++----- net/sunrpc/xprtrdma/xprt_rdma.h | 8 ++- 3 files changed, 92 insertions(+), 17 deletions(-) diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index ffa19dfc18c4..a67cecd04bc1 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -270,9 +270,9 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) xprt_clear_connected(xprt); - rpcrdma_buffer_destroy(&r_xprt->rx_buf); rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia); - rpcrdma_ia_close(&r_xprt->rx_ia); + rpcrdma_buffer_destroy(&r_xprt->rx_buf); + rpcrdma_ia_close(&r_xprt->rx_ep, &r_xprt->rx_ia); xprt_rdma_free_addresses(xprt); @@ -422,7 +422,7 @@ out4: out3: rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia); out2: - rpcrdma_ia_close(&new_xprt->rx_ia); + rpcrdma_ia_close(new_ep, &new_xprt->rx_ia); out1: xprt_free(xprt); return ERR_PTR(rc); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index e6eccd4f5fb3..7f1586aaa980 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -197,9 +197,12 @@ static const char * const wc_status[] = { wc_status[(status)] : "unexpected completion error") static void -rpcrdma_sendcq_process_wc(struct ib_wc *wc) +rpcrdma_sendcq_process_wc(struct rpcrdma_ep *ep, struct ib_wc *wc) { /* WARNING: Only wr_id and status are reliable at this point */ + if (wc->wr_id == RPCRDMA_LAST_COMPLETION) + goto out_last; + if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) { if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR) @@ -211,6 +214,13 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc) r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; r->mw_sendcompletion(wc); } + return; + +out_last: + dprintk("RPC: %s: completion sentinel for ep=%p\n", + __func__, ep); + if (atomic_dec_return(&ep->rep_count)) + complete(&ep->rep_done); } /* The wc array is on stack: automatic memory is always CPU-local. @@ -235,7 +245,7 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) count = rc; while (count-- > 0) - rpcrdma_sendcq_process_wc(pos++); + rpcrdma_sendcq_process_wc(ep, pos++); } while (rc == ARRAY_SIZE(wcs)); return; @@ -267,12 +277,15 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) } static void -rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) +rpcrdma_recvcq_process_wc(struct rpcrdma_ep *ep, struct ib_wc *wc, + struct list_head *sched_list) { struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; /* WARNING: Only wr_id and status are reliable at this point */ + if (wc->wr_id == RPCRDMA_LAST_COMPLETION) + goto out_last; if (wc->status != IB_WC_SUCCESS) goto out_fail; @@ -298,6 +311,12 @@ out_fail: __func__, rep, COMPLETION_MSG(wc->status)); rep->rr_len = ~0U; goto out_schedule; + +out_last: + dprintk("RPC: %s: completion sentinel for ep=%p\n", + __func__, ep); + if (atomic_dec_return(&ep->rep_count)) + complete(&ep->rep_done); } /* The wc array is on stack: automatic memory is always CPU-local. @@ -322,7 +341,7 @@ rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(pos++, &sched_list); + rpcrdma_recvcq_process_wc(ep, pos++, &sched_list); } while (rc == ARRAY_SIZE(wcs)); out_schedule: @@ -364,11 +383,11 @@ rpcrdma_flush_cqs(struct rpcrdma_ep *ep) LIST_HEAD(sched_list); while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc, &sched_list); + rpcrdma_recvcq_process_wc(ep, &wc, &sched_list); if (!list_empty(&sched_list)) rpcrdma_schedule_tasklet(&sched_list); while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) - rpcrdma_sendcq_process_wc(&wc); + rpcrdma_sendcq_process_wc(ep, &wc); } #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) @@ -663,18 +682,66 @@ out1: return rc; } +/* Post sentinel Work Requests + * + * Caller has already invoked rdma_disconnect(), thus the QP + * has already transitioned to the error state. All pending WRs, + * including posted receive buffers, send WRs, and the sentinels, + * are flushed in order. + * + * Once the sentinel requests come through the upcall handlers, + * all posted WRs are guaranteed to have completed, and it is + * safe to tear down the QP. + */ +static void +rpcrdma_destroy_qp(struct rpcrdma_ep *ep, struct rdma_cm_id *id) +{ + static struct ib_send_wr send_wr = { + .wr_id = RPCRDMA_LAST_COMPLETION + }; + struct ib_send_wr *send_wr_fail; + static struct ib_recv_wr recv_wr = { + .wr_id = RPCRDMA_LAST_COMPLETION + }; + struct ib_recv_wr *recv_wr_fail; + int rc; + + atomic_set(&ep->rep_count, 2); + init_completion(&ep->rep_done); + + rc = ib_post_send(id->qp, &send_wr, &send_wr_fail); + if (rc) { + pr_err("RPC: %s: ib_post_send returned %i\n", + __func__, rc); + goto out; + } + + rc = ib_post_recv(id->qp, &recv_wr, &recv_wr_fail); + if (rc) { + pr_err("RPC: %s: ib_post_recv returned %i\n", + __func__, rc); + goto out; + } + + wait_for_completion(&ep->rep_done); + +out: + dprintk("RPC: %s: destroying QP for ep=%p\n", + __func__, ep); + rdma_destroy_qp(id); + id->qp = NULL; +} + /* * Clean up/close an IA. * o if event handles and PD have been initialized, free them. * o close the IA */ void -rpcrdma_ia_close(struct rpcrdma_ia *ia) +rpcrdma_ia_close(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { dprintk("RPC: %s: entering\n", __func__); if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) { - if (ia->ri_id->qp) - rdma_destroy_qp(ia->ri_id); rpcrdma_destroy_id(ia->ri_id); ia->ri_id = NULL; } @@ -693,6 +760,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, { struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; + unsigned int max_qp_wr; int rc, err; if (devattr->max_sge < RPCRDMA_MAX_IOVS) { @@ -701,6 +769,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, return -ENOMEM; } + max_qp_wr = devattr->max_qp_wr - 1; /* check provider's send/recv wr limits */ if (cdata->max_requests > devattr->max_qp_wr) cdata->max_requests = devattr->max_qp_wr; @@ -708,11 +777,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; - ep->rep_attr.cap.max_send_wr = cdata->max_requests; + ep->rep_attr.cap.max_send_wr = cdata->max_requests + 1; rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; - ep->rep_attr.cap.max_recv_wr = cdata->max_requests; + ep->rep_attr.cap.max_recv_wr = cdata->max_requests + 1; ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; @@ -826,7 +895,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) if (ia->ri_id->qp) { rpcrdma_ep_disconnect(ep, ia); - rdma_destroy_qp(ia->ri_id); + rpcrdma_destroy_qp(ep, ia->ri_id); ia->ri_id->qp = NULL; } @@ -903,7 +972,7 @@ retry: ia->ri_id = id; write_unlock(&ia->ri_qplock); - rdma_destroy_qp(old); + rpcrdma_destroy_qp(ep, old); rpcrdma_destroy_id(old); } else { dprintk("RPC: %s: connecting...\n", __func__); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index c63218252bdb..70aebcbc2c56 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -86,6 +86,8 @@ struct rpcrdma_ep { wait_queue_head_t rep_connect_wait; struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; + atomic_t rep_count; + struct completion rep_done; struct delayed_work rep_connect_worker; }; @@ -102,6 +104,10 @@ struct rpcrdma_ep { */ #define RPCRDMA_IGNORE_COMPLETION (0ULL) +/* Receive completion sentinel + */ +#define RPCRDMA_LAST_COMPLETION (1ULL) + /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV * * The below structure appears at the front of a large region of kmalloc'd @@ -391,7 +397,7 @@ extern int xprt_rdma_pad_optimize; * Interface Adapter calls - xprtrdma/verbs.c */ int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int); -void rpcrdma_ia_close(struct rpcrdma_ia *); +void rpcrdma_ia_close(struct rpcrdma_ep *, struct rpcrdma_ia *); /* * Endpoint calls - xprtrdma/verbs.c -- 2.50.1