rpclen = 0;
        }
 
+       req->rl_xid = rqst->rq_xid;
+       rpcrdma_insert_req(&r_xprt->rx_buf, req);
+
        /* This implementation supports the following combinations
         * of chunk lists in one RPC-over-RDMA Call message:
         *
 {
        struct rpcrdma_rep *rep =
                        container_of(work, struct rpcrdma_rep, rr_work);
+       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        struct rpcrdma_msg *headerp;
        struct rpcrdma_req *req;
        struct rpc_rqst *rqst;
-       struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
-       struct rpc_xprt *xprt = &r_xprt->rx_xprt;
        __be32 *iptr;
        int rdmalen, status, rmerr;
        unsigned long cwnd;
        /* Match incoming rpcrdma_rep to an rpcrdma_req to
         * get context for handling any incoming chunks.
         */
-       spin_lock_bh(&xprt->transport_lock);
-       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
-       if (!rqst)
+       spin_lock(&buf->rb_lock);
+       req = rpcrdma_lookup_req_locked(&r_xprt->rx_buf,
+                                       headerp->rm_xid);
+       if (!req)
                goto out_nomatch;
-
-       req = rpcr_to_rdmar(rqst);
        if (req->rl_reply)
                goto out_duplicate;
 
-       /* Sanity checking has passed. We are now committed
-        * to complete this transaction.
-        */
        list_replace_init(&req->rl_registered, &mws);
        rpcrdma_mark_remote_invalidation(&mws, rep);
-       list_del_init(&rqst->rq_list);
+
+       /* Avoid races with signals and duplicate replies
+        * by marking this req as matched.
+        */
        req->rl_reply = rep;
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
+
        dprintk("RPC:       %s: reply %p completes request %p (xid 0x%08x)\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
-       xprt->reestablish_timeout = 0;
+       /* Invalidate and unmap the data payloads before waking the
+        * waiting application. This guarantees the memory regions
+        * are properly fenced from the server before the application
+        * accesses the data. It also ensures proper send flow control:
+        * waking the next RPC waits until this RPC has relinquished
+        * all its Send Queue entries.
+        */
+       if (!list_empty(&mws))
+               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
 
+       /* Perform XID lookup, reconstruction of the RPC reply, and
+        * RPC completion while holding the transport lock to ensure
+        * the rep, rqst, and rq_task pointers remain stable.
+        */
+       spin_lock_bh(&xprt->transport_lock);
+       rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+       if (!rqst)
+               goto out_norqst;
+       xprt->reestablish_timeout = 0;
        if (headerp->rm_vers != rpcrdma_version)
                goto out_badversion;
 
        }
 
 out:
-       /* Invalidate and flush the data payloads before waking the
-        * waiting application. This guarantees the memory region is
-        * properly fenced from the server before the application
-        * accesses the data. It also ensures proper send flow
-        * control: waking the next RPC waits until this RPC has
-        * relinquished all its Send Queue entries.
-        */
-       if (!list_empty(&mws))
-               r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, &mws);
-
-       spin_lock_bh(&xprt->transport_lock);
        cwnd = xprt->cwnd;
        xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
        if (xprt->cwnd > cwnd)
        xprt_complete_rqst(rqst->rq_task, status);
        spin_unlock_bh(&xprt->transport_lock);
        dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
-                       __func__, xprt, rqst, status);
+               __func__, xprt, rqst, status);
        return;
 
 out_badstatus:
        r_xprt->rx_stats.bad_reply_count++;
        goto out;
 
-/* If no pending RPC transaction was matched, post a replacement
- * receive buffer before returning.
+/* The req was still available, but by the time the transport_lock
+ * was acquired, the rqst and task had been released. Thus the RPC
+ * has already been terminated.
  */
+out_norqst:
+       spin_unlock_bh(&xprt->transport_lock);
+       rpcrdma_buffer_put(req);
+       dprintk("RPC:       %s: race, no rqst left for req %p\n",
+               __func__, req);
+       return;
+
 out_shortreply:
        dprintk("RPC:       %s: short/invalid reply\n", __func__);
        goto repost;
 
 out_nomatch:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
        dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
                __func__, be32_to_cpu(headerp->rm_xid),
                rep->rr_len);
        goto repost;
 
 out_duplicate:
-       spin_unlock_bh(&xprt->transport_lock);
+       spin_unlock(&buf->rb_lock);
        dprintk("RPC:       %s: "
                "duplicate reply %p to RPC request %p: xid 0x%08x\n",
                __func__, rep, req, be32_to_cpu(headerp->rm_xid));
 
+/* If no pending RPC transaction was matched, post a replacement
+ * receive buffer before returning.
+ */
 repost:
        r_xprt->rx_stats.bad_reply_count++;
        if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
 
 struct rpcrdma_buffer;
 struct rpcrdma_req {
        struct list_head        rl_list;
+       __be32                  rl_xid;
        unsigned int            rl_mapped_sges;
        unsigned int            rl_connect_cookie;
        struct rpcrdma_buffer   *rl_buffer;
        int                     rb_send_count, rb_recv_count;
        struct list_head        rb_send_bufs;
        struct list_head        rb_recv_bufs;
+       struct list_head        rb_pending;
        u32                     rb_max_requests;
        atomic_t                rb_credits;     /* most recent credit grant */
 
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+static inline void
+rpcrdma_insert_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+       spin_lock(&buffers->rb_lock);
+       if (list_empty(&req->rl_list))
+               list_add_tail(&req->rl_list, &buffers->rb_pending);
+       spin_unlock(&buffers->rb_lock);
+}
+
+static inline struct rpcrdma_req *
+rpcrdma_lookup_req_locked(struct rpcrdma_buffer *buffers, __be32 xid)
+{
+       struct rpcrdma_req *pos;
+
+       list_for_each_entry(pos, &buffers->rb_pending, rl_list)
+               if (pos->rl_xid == xid)
+                       return pos;
+       return NULL;
+}
+
+static inline void
+rpcrdma_remove_req(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
+{
+       spin_lock(&buffers->rb_lock);
+       list_del(&req->rl_list);
+       spin_unlock(&buffers->rb_lock);
+}
+
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);