]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
xprtrdma: Wait before destroying transport's queue pair
authorChuck Lever <chuck.lever@oracle.com>
Tue, 21 Apr 2015 22:27:39 +0000 (18:27 -0400)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 31 Aug 2015 20:46:00 +0000 (14:46 -0600)
[ Proposed for upstream v4.4 ]

Ensure that resources needed to handle completions remain in place
until all pending completions have been processed.

This solution is based on commit 7dad6b2e440d ("IB/srp: Fix a race
condition triggered by destroying a queue pair").

Note that when shutting down a transport, destroying the receive
buffers must be done _after_ the receive buffers are flushed.

Also note that during a transport reconnect, any waiting RPC can
exit (eg soft timeout). With FRWR, that can result in a posted send
WR after the sentinel. That will be addressed eventually. Right now
there are no dependencies on this (mis)behavior.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
net/sunrpc/xprtrdma/transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index ffa19dfc18c4e72de36173c32e318efc1247732e..a67cecd04bc1635665c0a162dbb36e5334347c2c 100644 (file)
@@ -270,9 +270,9 @@ xprt_rdma_destroy(struct rpc_xprt *xprt)
 
        xprt_clear_connected(xprt);
 
-       rpcrdma_buffer_destroy(&r_xprt->rx_buf);
        rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
-       rpcrdma_ia_close(&r_xprt->rx_ia);
+       rpcrdma_buffer_destroy(&r_xprt->rx_buf);
+       rpcrdma_ia_close(&r_xprt->rx_ep, &r_xprt->rx_ia);
 
        xprt_rdma_free_addresses(xprt);
 
@@ -422,7 +422,7 @@ out4:
 out3:
        rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
 out2:
-       rpcrdma_ia_close(&new_xprt->rx_ia);
+       rpcrdma_ia_close(new_ep, &new_xprt->rx_ia);
 out1:
        xprt_free(xprt);
        return ERR_PTR(rc);
index e6eccd4f5fb3096834658c44173f132848376b7e..7f1586aaa98054e5391d41cab16f4ff094560f0a 100644 (file)
@@ -197,9 +197,12 @@ static const char * const wc_status[] = {
                wc_status[(status)] : "unexpected completion error")
 
 static void
-rpcrdma_sendcq_process_wc(struct ib_wc *wc)
+rpcrdma_sendcq_process_wc(struct rpcrdma_ep *ep, struct ib_wc *wc)
 {
        /* WARNING: Only wr_id and status are reliable at this point */
+       if (wc->wr_id == RPCRDMA_LAST_COMPLETION)
+               goto out_last;
+
        if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
                if (wc->status != IB_WC_SUCCESS &&
                    wc->status != IB_WC_WR_FLUSH_ERR)
@@ -211,6 +214,13 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
                r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
                r->mw_sendcompletion(wc);
        }
+       return;
+
+out_last:
+       dprintk("RPC:       %s: completion sentinel for ep=%p\n",
+               __func__, ep);
+       if (atomic_dec_return(&ep->rep_count))
+               complete(&ep->rep_done);
 }
 
 /* The wc array is on stack: automatic memory is always CPU-local.
@@ -235,7 +245,7 @@ rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_sendcq_process_wc(pos++);
+                       rpcrdma_sendcq_process_wc(ep, pos++);
        } while (rc == ARRAY_SIZE(wcs));
        return;
 
@@ -267,12 +277,15 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct rpcrdma_ep *ep, struct ib_wc *wc,
+                         struct list_head *sched_list)
 {
        struct rpcrdma_rep *rep =
                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
 
        /* WARNING: Only wr_id and status are reliable at this point */
+       if (wc->wr_id == RPCRDMA_LAST_COMPLETION)
+               goto out_last;
        if (wc->status != IB_WC_SUCCESS)
                goto out_fail;
 
@@ -298,6 +311,12 @@ out_fail:
                       __func__, rep, COMPLETION_MSG(wc->status));
        rep->rr_len = ~0U;
        goto out_schedule;
+
+out_last:
+       dprintk("RPC:       %s: completion sentinel for ep=%p\n",
+               __func__, ep);
+       if (atomic_dec_return(&ep->rep_count))
+               complete(&ep->rep_done);
 }
 
 /* The wc array is on stack: automatic memory is always CPU-local.
@@ -322,7 +341,7 @@ rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
 
                count = rc;
                while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(pos++, &sched_list);
+                       rpcrdma_recvcq_process_wc(ep, pos++, &sched_list);
        } while (rc == ARRAY_SIZE(wcs));
 
 out_schedule:
@@ -364,11 +383,11 @@ rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
        LIST_HEAD(sched_list);
 
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc, &sched_list);
+               rpcrdma_recvcq_process_wc(ep, &wc, &sched_list);
        if (!list_empty(&sched_list))
                rpcrdma_schedule_tasklet(&sched_list);
        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
-               rpcrdma_sendcq_process_wc(&wc);
+               rpcrdma_sendcq_process_wc(ep, &wc);
 }
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -663,18 +682,66 @@ out1:
        return rc;
 }
 
+/* Post sentinel Work Requests
+ *
+ * Caller has already invoked rdma_disconnect(), thus the QP
+ * has already transitioned to the error state. All pending WRs,
+ * including posted receive buffers, send WRs, and the sentinels,
+ * are flushed in order.
+ *
+ * Once the sentinel requests come through the upcall handlers,
+ * all posted WRs are guaranteed to have completed, and it is
+ * safe to tear down the QP.
+ */
+static void
+rpcrdma_destroy_qp(struct rpcrdma_ep *ep, struct rdma_cm_id *id)
+{
+       static struct ib_send_wr send_wr = {
+               .wr_id          = RPCRDMA_LAST_COMPLETION
+       };
+       struct ib_send_wr *send_wr_fail;
+       static struct ib_recv_wr recv_wr = {
+               .wr_id          = RPCRDMA_LAST_COMPLETION
+       };
+       struct ib_recv_wr *recv_wr_fail;
+       int rc;
+
+       atomic_set(&ep->rep_count, 2);
+       init_completion(&ep->rep_done);
+
+       rc = ib_post_send(id->qp, &send_wr, &send_wr_fail);
+       if (rc) {
+               pr_err("RPC:       %s: ib_post_send returned %i\n",
+                      __func__, rc);
+               goto out;
+       }
+
+       rc = ib_post_recv(id->qp, &recv_wr, &recv_wr_fail);
+       if (rc) {
+               pr_err("RPC:       %s: ib_post_recv returned %i\n",
+                      __func__, rc);
+               goto out;
+       }
+
+       wait_for_completion(&ep->rep_done);
+
+out:
+       dprintk("RPC:       %s: destroying QP for ep=%p\n",
+               __func__, ep);
+       rdma_destroy_qp(id);
+       id->qp = NULL;
+}
+
 /*
  * Clean up/close an IA.
  *   o if event handles and PD have been initialized, free them.
  *   o close the IA
  */
 void
-rpcrdma_ia_close(struct rpcrdma_ia *ia)
+rpcrdma_ia_close(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
        dprintk("RPC:       %s: entering\n", __func__);
        if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
-               if (ia->ri_id->qp)
-                       rdma_destroy_qp(ia->ri_id);
                rpcrdma_destroy_id(ia->ri_id);
                ia->ri_id = NULL;
        }
@@ -693,6 +760,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 {
        struct ib_device_attr *devattr = &ia->ri_devattr;
        struct ib_cq *sendcq, *recvcq;
+       unsigned int max_qp_wr;
        int rc, err;
 
        if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -701,6 +769,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
                return -ENOMEM;
        }
 
+       max_qp_wr = devattr->max_qp_wr - 1;
        /* check provider's send/recv wr limits */
        if (cdata->max_requests > devattr->max_qp_wr)
                cdata->max_requests = devattr->max_qp_wr;
@@ -708,11 +777,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
        ep->rep_attr.qp_context = ep;
        ep->rep_attr.srq = NULL;
-       ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_send_wr = cdata->max_requests + 1;
        rc = ia->ri_ops->ro_open(ia, ep, cdata);
        if (rc)
                return rc;
-       ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_recv_wr = cdata->max_requests + 1;
        ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
        ep->rep_attr.cap.max_recv_sge = 1;
        ep->rep_attr.cap.max_inline_data = 0;
@@ -826,7 +895,7 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 
        if (ia->ri_id->qp) {
                rpcrdma_ep_disconnect(ep, ia);
-               rdma_destroy_qp(ia->ri_id);
+               rpcrdma_destroy_qp(ep, ia->ri_id);
                ia->ri_id->qp = NULL;
        }
 
@@ -903,7 +972,7 @@ retry:
                ia->ri_id = id;
                write_unlock(&ia->ri_qplock);
 
-               rdma_destroy_qp(old);
+               rpcrdma_destroy_qp(ep, old);
                rpcrdma_destroy_id(old);
        } else {
                dprintk("RPC:       %s: connecting...\n", __func__);
index c63218252bdb3146b741fe3b58d894414bd34053..70aebcbc2c565dc0ef8b5ae27a2ced054f15ded7 100644 (file)
@@ -86,6 +86,8 @@ struct rpcrdma_ep {
        wait_queue_head_t       rep_connect_wait;
        struct rdma_conn_param  rep_remote_cma;
        struct sockaddr_storage rep_remote_addr;
+       atomic_t                rep_count;
+       struct completion       rep_done;
        struct delayed_work     rep_connect_worker;
 };
 
@@ -102,6 +104,10 @@ struct rpcrdma_ep {
  */
 #define RPCRDMA_IGNORE_COMPLETION      (0ULL)
 
+/* Receive completion sentinel
+ */
+#define RPCRDMA_LAST_COMPLETION                (1ULL)
+
 /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
  *
  * The below structure appears at the front of a large region of kmalloc'd
@@ -391,7 +397,7 @@ extern int xprt_rdma_pad_optimize;
  * Interface Adapter calls - xprtrdma/verbs.c
  */
 int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
-void rpcrdma_ia_close(struct rpcrdma_ia *);
+void rpcrdma_ia_close(struct rpcrdma_ep *, struct rpcrdma_ia *);
 
 /*
  * Endpoint calls - xprtrdma/verbs.c