goto out;
 }
 
+/* Reply handling runs in the poll worker thread. Anything that
+ * might wait is deferred to a separate workqueue.
+ */
+void rpcrdma_deferred_completion(struct work_struct *work)
+{
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
+       struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
+       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+
+       /* Invalidate and unmap the data payloads before waking
+        * the waiting application. This guarantees the memory
+        * regions are properly fenced from the server before the
+        * application accesses the data. It also ensures proper
+        * send flow control: waking the next RPC waits until this
+        * RPC has relinquished all its Send Queue entries.
+        */
+       rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
+       r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &req->rl_registered);
+
+       rpcrdma_complete_rqst(rep);
+}
+
 /* Process received RPC/RDMA messages.
  *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
  */
-void
-rpcrdma_reply_handler(struct work_struct *work)
+void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 {
-       struct rpcrdma_rep *rep =
-                       container_of(work, struct rpcrdma_rep, rr_work);
        struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
        struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        struct rpcrdma_req *req;
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(rep->rr_xid));
 
-       /* Invalidate and unmap the data payloads before waking the
-        * waiting application. This guarantees the memory regions
-        * are properly fenced from the server before the application
-        * accesses the data. It also ensures proper send flow control:
-        * waking the next RPC waits until this RPC has relinquished
-        * all its Send Queue entries.
-        */
-       if (!list_empty(&req->rl_registered)) {
-               rpcrdma_mark_remote_invalidation(&req->rl_registered, rep);
-               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
-                                                   &req->rl_registered);
-       }
-
-       rpcrdma_complete_rqst(rep);
+       if (list_empty(&req->rl_registered))
+               rpcrdma_complete_rqst(rep);
+       else
+               queue_work(rpcrdma_receive_wq, &rep->rr_work);
        return;
 
 out_badstatus:
 
 static void rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf);
 static void rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb);
 
-static struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
+struct workqueue_struct *rpcrdma_receive_wq __read_mostly;
 
 int
 rpcrdma_alloc_wq(void)
                rpcrdma_update_granted_credits(rep);
 
 out_schedule:
-       queue_work(rpcrdma_receive_wq, &rep->rr_work);
+       rpcrdma_reply_handler(rep);
        return;
 
 out_fail:
 
        recvcq = ib_alloc_cq(ia->ri_device, NULL,
                             ep->rep_attr.cap.max_recv_wr + 1,
-                            0, IB_POLL_SOFTIRQ);
+                            0, IB_POLL_WORKQUEUE);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
 
        rep->rr_cqe.done = rpcrdma_wc_receive;
        rep->rr_rxprt = r_xprt;
-       INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
+       INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion);
        rep->rr_recv_wr.next = NULL;
        rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
        rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 
 bool frwr_is_supported(struct rpcrdma_ia *);
 bool fmr_is_supported(struct rpcrdma_ia *);
 
+extern struct workqueue_struct *rpcrdma_receive_wq;
+
 /*
  * Endpoint calls - xprtrdma/verbs.c
  */
 int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
-void rpcrdma_reply_handler(struct work_struct *work);
+void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
+void rpcrdma_deferred_completion(struct work_struct *work);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
 {