struct rpcrdma_ep *ep = &r_xprt->rx_ep;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
+       might_sleep();
+
        dprintk("RPC:       %s: closing xprt %p\n", __func__, xprt);
 
+       /* Prevent marshaling and sending of new requests */
+       xprt_clear_connected(xprt);
+
        if (test_and_clear_bit(RPCRDMA_IAF_REMOVING, &ia->ri_flags)) {
-               xprt_clear_connected(xprt);
                rpcrdma_ia_remove(ia);
                return;
        }
                dprintk("RPC:       %s: xprt_unregister returned %i\n",
                        __func__, rc);
 
-       rpcrdma_destroy_wq();
-
        rc = xprt_unregister_transport(&xprt_rdma_bc);
        if (rc)
                dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
 {
        int rc;
 
-       rc = rpcrdma_alloc_wq();
-       if (rc)
-               return rc;
-
        rc = xprt_register_transport(&xprt_rdma);
-       if (rc) {
-               rpcrdma_destroy_wq();
+       if (rc)
                return rc;
-       }
 
        rc = xprt_register_transport(&xprt_rdma_bc);
        if (rc) {
                xprt_unregister_transport(&xprt_rdma);
-               rpcrdma_destroy_wq();
                return rc;
        }
 
 
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
 
-struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
-
-int
-rpcrdma_alloc_wq(void)
+/* Wait for outstanding transport work to finish.
+ */
+static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
 {
-       struct workqueue_struct *recv_wq;
-
-       recv_wq = alloc_workqueue("xprtrdma_receive",
-                                 WQ_MEM_RECLAIM | WQ_HIGHPRI,
-                                 0);
-       if (!recv_wq)
-               return -ENOMEM;
-
-       rpcrdma_receive_wq = recv_wq;
-       return 0;
-}
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-void
-rpcrdma_destroy_wq(void)
-{
-       struct workqueue_struct *wq;
+       /* Flush Receives, then wait for deferred Reply work
+        * to complete.
+        */
+       ib_drain_qp(ia->ri_id->qp);
+       drain_workqueue(buf->rb_completion_wq);
 
-       if (rpcrdma_receive_wq) {
-               wq = rpcrdma_receive_wq;
-               rpcrdma_receive_wq = NULL;
-               destroy_workqueue(wq);
-       }
+       /* Deferred Reply processing might have scheduled
+        * local invalidations.
+        */
+       ib_drain_sq(ia->ri_id->qp);
 }
 
 /**
         *   connection is already gone.
         */
        if (ia->ri_id->qp) {
-               ib_drain_qp(ia->ri_id->qp);
+               rpcrdma_xprt_drain(r_xprt);
                rdma_destroy_qp(ia->ri_id);
                ia->ri_id->qp = NULL;
        }
        return rc;
 }
 
-/*
- * rpcrdma_ep_disconnect
+/**
+ * rpcrdma_ep_disconnect - Disconnect underlying transport
+ * @ep: endpoint to disconnect
+ * @ia: associated interface adapter
  *
  * This is separate from destroy to facilitate the ability
  * to reconnect without recreating the endpoint.
 void
 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 {
+       struct rpcrdma_xprt *r_xprt = container_of(ep, struct rpcrdma_xprt,
+                                                  rx_ep);
        int rc;
 
+       /* returns without wait if ID is not connected */
        rc = rdma_disconnect(ia->ri_id);
        if (!rc)
-               /* returns without wait if not connected */
                wait_event_interruptible(ep->rep_connect_wait,
                                                        ep->rep_connected != 1);
        else
                ep->rep_connected = rc;
-       trace_xprtrdma_disconnect(container_of(ep, struct rpcrdma_xprt,
-                                              rx_ep), rc);
+       trace_xprtrdma_disconnect(r_xprt, rc);
 
-       ib_drain_qp(ia->ri_id->qp);
+       rpcrdma_xprt_drain(r_xprt);
 }
 
 /* Fixed-size circular FIFO queue. This implementation is wait-free and
        if (rc)
                goto out;
 
+       buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s",
+                                               WQ_MEM_RECLAIM | WQ_HIGHPRI,
+                                               0,
+                       r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]);
+       if (!buf->rb_completion_wq)
+               goto out;
+
        return 0;
 out:
        rpcrdma_buffer_destroy(buf);
 {
        cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
+       if (buf->rb_completion_wq) {
+               destroy_workqueue(buf->rb_completion_wq);
+               buf->rb_completion_wq = NULL;
+       }
+
        rpcrdma_sendctxs_destroy(buf);
 
        while (!list_empty(&buf->rb_recv_bufs)) {
 
 
        u32                     rb_bc_max_requests;
 
+       struct workqueue_struct *rb_completion_wq;
        struct delayed_work     rb_refresh_worker;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 bool frwr_is_supported(struct rpcrdma_ia *);
 bool fmr_is_supported(struct rpcrdma_ia *);
 
-extern struct workqueue_struct *rpcrdma_receive_wq;
-
 /*
  * Endpoint calls - xprtrdma/verbs.c
  */
        return __rpcrdma_dma_map_regbuf(ia, rb);
 }
 
-int rpcrdma_alloc_wq(void);
-void rpcrdma_destroy_wq(void);
-
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */