}
 
 /**
- * rpcrdma_unmap_sges - DMA-unmap Send buffers
- * @ia: interface adapter (device)
- * @req: req with possibly some SGEs to be DMA unmapped
+ * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
+ * @sc: sendctx containing SGEs to unmap
  *
  */
 void
-rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
 {
+       struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
        struct ib_sge *sge;
        unsigned int count;
 
+       dprintk("RPC:       %s: unmapping %u sges for sc=%p\n",
+               __func__, sc->sc_unmap_count, sc);
+
        /* The first two SGEs contain the transport header and
         * the inline buffer. These are always left mapped so
         * they can be cheaply re-used.
         */
-       sge = &req->rl_send_sge[2];
-       for (count = req->rl_mapped_sges; count--; sge++)
+       sge = &sc->sc_sges[2];
+       for (count = sc->sc_unmap_count; count; ++sge, --count)
                ib_dma_unmap_page(ia->ri_device,
                                  sge->addr, sge->length, DMA_TO_DEVICE);
 }
 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
                        u32 len)
 {
+       struct rpcrdma_sendctx *sc = req->rl_sendctx;
        struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
-       struct ib_sge *sge = &req->rl_send_sge[0];
+       struct ib_sge *sge = sc->sc_sges;
 
        if (!rpcrdma_dma_map_regbuf(ia, rb))
                goto out_regbuf;
 
        ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
                                      sge->length, DMA_TO_DEVICE);
-       req->rl_send_wr.num_sge++;
+       sc->sc_wr.num_sge++;
        return true;
 
 out_regbuf:
 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
                         struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
+       struct rpcrdma_sendctx *sc = req->rl_sendctx;
        unsigned int sge_no, page_base, len, remaining;
        struct rpcrdma_regbuf *rb = req->rl_sendbuf;
        struct ib_device *device = ia->ri_device;
-       struct ib_sge *sge = req->rl_send_sge;
+       struct ib_sge *sge = sc->sc_sges;
        u32 lkey = ia->ri_pd->local_dma_lkey;
        struct page *page, **ppages;
 
                        sge[sge_no].length = len;
                        sge[sge_no].lkey = lkey;
 
-                       req->rl_mapped_sges++;
+                       sc->sc_unmap_count++;
                        ppages++;
                        remaining -= len;
                        page_base = 0;
                        goto out_mapping_err;
                sge[sge_no].length = len;
                sge[sge_no].lkey = lkey;
-               req->rl_mapped_sges++;
+               sc->sc_unmap_count++;
        }
 
 out:
-       req->rl_send_wr.num_sge += sge_no;
+       sc->sc_wr.num_sge += sge_no;
        return true;
 
 out_regbuf:
        return false;
 
 out_mapping_overflow:
-       rpcrdma_unmap_sges(ia, req);
+       rpcrdma_unmap_sendctx(sc);
        pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
        return false;
 
 out_mapping_err:
-       rpcrdma_unmap_sges(ia, req);
+       rpcrdma_unmap_sendctx(sc);
        pr_err("rpcrdma: Send mapping error\n");
        return false;
 }
                          struct rpcrdma_req *req, u32 hdrlen,
                          struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
 {
-       req->rl_send_wr.num_sge = 0;
-       req->rl_mapped_sges = 0;
+       req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
+       if (!req->rl_sendctx)
+               return -ENOBUFS;
+       req->rl_sendctx->sc_wr.num_sge = 0;
+       req->rl_sendctx->sc_unmap_count = 0;
 
        if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
                return -EIO;
 
 #include <linux/prefetch.h>
 #include <linux/sunrpc/addr.h>
 #include <linux/sunrpc/svc_rdma.h>
+
+#include <asm-generic/barrier.h>
 #include <asm/bitops.h>
 
 #include <rdma/ib_cm.h>
 static void
 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
 {
+       struct ib_cqe *cqe = wc->wr_cqe;
+       struct rpcrdma_sendctx *sc =
+               container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
+
        /* WARNING: Only wr_cqe and status are reliable at this point */
        if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
                pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
                       ib_wc_status_msg(wc->status),
                       wc->status, wc->vendor_err);
+
+       rpcrdma_sendctx_put_locked(sc);
 }
 
 /**
                ep->rep_attr.cap.max_recv_sge);
 
        /* set trigger for requesting send completion */
+       ep->rep_send_batch = min_t(unsigned int, RPCRDMA_MAX_SEND_BATCH,
+                                  cdata->max_requests >> 2);
+       ep->rep_send_count = ep->rep_send_batch;
        ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
        if (ep->rep_cqinit <= 2)
                ep->rep_cqinit = 0;     /* always signal? */
        ib_drain_qp(ia->ri_id->qp);
 }
 
+/* Fixed-size circular FIFO queue. This implementation is wait-free and
+ * lock-free.
+ *
+ * Consumer is the code path that posts Sends. This path dequeues a
+ * sendctx for use by a Send operation. Multiple consumer threads
+ * are serialized by the RPC transport lock, which allows only one
+ * ->send_request call at a time.
+ *
+ * Producer is the code path that handles Send completions. This path
+ * enqueues a sendctx that has been completed. Multiple producer
+ * threads are serialized by the ib_poll_cq() function.
+ */
+
+/* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
+ * queue activity, and ib_drain_qp has flushed all remaining Send
+ * requests.
+ */
+static void rpcrdma_sendctxs_destroy(struct rpcrdma_buffer *buf)
+{
+       unsigned long i;
+
+       for (i = 0; i <= buf->rb_sc_last; i++)
+               kfree(buf->rb_sc_ctxs[i]);
+       kfree(buf->rb_sc_ctxs);
+}
+
+static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ia *ia)
+{
+       struct rpcrdma_sendctx *sc;
+
+       sc = kzalloc(sizeof(*sc) +
+                    ia->ri_max_send_sges * sizeof(struct ib_sge),
+                    GFP_KERNEL);
+       if (!sc)
+               return NULL;
+
+       sc->sc_wr.wr_cqe = &sc->sc_cqe;
+       sc->sc_wr.sg_list = sc->sc_sges;
+       sc->sc_wr.opcode = IB_WR_SEND;
+       sc->sc_cqe.done = rpcrdma_wc_send;
+       return sc;
+}
+
+static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_sendctx *sc;
+       unsigned long i;
+
+       /* Maximum number of concurrent outstanding Send WRs. Capping
+        * the circular queue size stops Send Queue overflow by causing
+        * the ->send_request call to fail temporarily before too many
+        * Sends are posted.
+        */
+       i = buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS;
+       dprintk("RPC:       %s: allocating %lu send_ctxs\n", __func__, i);
+       buf->rb_sc_ctxs = kcalloc(i, sizeof(sc), GFP_KERNEL);
+       if (!buf->rb_sc_ctxs)
+               return -ENOMEM;
+
+       buf->rb_sc_last = i - 1;
+       for (i = 0; i <= buf->rb_sc_last; i++) {
+               sc = rpcrdma_sendctx_create(&r_xprt->rx_ia);
+               if (!sc)
+                       goto out_destroy;
+
+               sc->sc_xprt = r_xprt;
+               buf->rb_sc_ctxs[i] = sc;
+       }
+
+       return 0;
+
+out_destroy:
+       rpcrdma_sendctxs_destroy(buf);
+       return -ENOMEM;
+}
+
+/* The sendctx queue is not guaranteed to have a size that is a
+ * power of two, thus the helpers in circ_buf.h cannot be used.
+ * The other option is to use modulus (%), which can be expensive.
+ */
+static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
+                                         unsigned long item)
+{
+       return likely(item < buf->rb_sc_last) ? item + 1 : 0;
+}
+
+/**
+ * rpcrdma_sendctx_get_locked - Acquire a send context
+ * @buf: transport buffers from which to acquire an unused context
+ *
+ * Returns pointer to a free send completion context; or NULL if
+ * the queue is empty.
+ *
+ * Usage: Called to acquire an SGE array before preparing a Send WR.
+ *
+ * The caller serializes calls to this function (per rpcrdma_buffer),
+ * and provides an effective memory barrier that flushes the new value
+ * of rb_sc_head.
+ */
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
+{
+       struct rpcrdma_xprt *r_xprt;
+       struct rpcrdma_sendctx *sc;
+       unsigned long next_head;
+
+       next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
+
+       if (next_head == READ_ONCE(buf->rb_sc_tail))
+               goto out_emptyq;
+
+       /* ORDER: item must be accessed _before_ head is updated */
+       sc = buf->rb_sc_ctxs[next_head];
+
+       /* Releasing the lock in the caller acts as a memory
+        * barrier that flushes rb_sc_head.
+        */
+       buf->rb_sc_head = next_head;
+
+       return sc;
+
+out_emptyq:
+       /* The queue is "empty" if there have not been enough Send
+        * completions recently. This is a sign the Send Queue is
+        * backing up. Cause the caller to pause and try again.
+        */
+       dprintk("RPC:       %s: empty sendctx queue\n", __func__);
+       r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
+       r_xprt->rx_stats.empty_sendctx_q++;
+       return NULL;
+}
+
+/**
+ * rpcrdma_sendctx_put_locked - Release a send context
+ * @sc: send context to release
+ *
+ * Usage: Called from Send completion to return a sendctxt
+ * to the queue.
+ *
+ * The caller serializes calls to this function (per rpcrdma_buffer).
+ */
+void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
+{
+       struct rpcrdma_buffer *buf = &sc->sc_xprt->rx_buf;
+       unsigned long next_tail;
+
+       /* Unmap SGEs of previously completed by unsignaled
+        * Sends by walking up the queue until @sc is found.
+        */
+       next_tail = buf->rb_sc_tail;
+       do {
+               next_tail = rpcrdma_sendctx_next(buf, next_tail);
+
+               /* ORDER: item must be accessed _before_ tail is updated */
+               rpcrdma_unmap_sendctx(buf->rb_sc_ctxs[next_tail]);
+
+       } while (buf->rb_sc_ctxs[next_tail] != sc);
+
+       /* Paired with READ_ONCE */
+       smp_store_release(&buf->rb_sc_tail, next_tail);
+}
+
 static void
 rpcrdma_mr_recovery_worker(struct work_struct *work)
 {
        spin_lock(&buffer->rb_reqslock);
        list_add(&req->rl_all, &buffer->rb_allreqs);
        spin_unlock(&buffer->rb_reqslock);
-       req->rl_cqe.done = rpcrdma_wc_send;
        req->rl_buffer = &r_xprt->rx_buf;
        INIT_LIST_HEAD(&req->rl_registered);
-       req->rl_send_wr.next = NULL;
-       req->rl_send_wr.wr_cqe = &req->rl_cqe;
-       req->rl_send_wr.sg_list = req->rl_send_sge;
-       req->rl_send_wr.opcode = IB_WR_SEND;
        return req;
 }
 
                list_add(&rep->rr_list, &buf->rb_recv_bufs);
        }
 
+       rc = rpcrdma_sendctxs_create(r_xprt);
+       if (rc)
+               goto out;
+
        return 0;
 out:
        rpcrdma_buffer_destroy(buf);
        cancel_delayed_work_sync(&buf->rb_recovery_worker);
        cancel_delayed_work_sync(&buf->rb_refresh_worker);
 
+       rpcrdma_sendctxs_destroy(buf);
+
        while (!list_empty(&buf->rb_recv_bufs)) {
                struct rpcrdma_rep *rep;
 
        struct rpcrdma_buffer *buffers = req->rl_buffer;
        struct rpcrdma_rep *rep = req->rl_reply;
 
-       req->rl_send_wr.num_sge = 0;
        req->rl_reply = NULL;
 
        spin_lock(&buffers->rb_lock);
                struct rpcrdma_ep *ep,
                struct rpcrdma_req *req)
 {
-       struct ib_send_wr *send_wr = &req->rl_send_wr;
+       struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
        struct ib_send_wr *send_wr_fail;
        int rc;
 
        dprintk("RPC:       %s: posting %d s/g entries\n",
                __func__, send_wr->num_sge);
 
-       rpcrdma_set_signaled(ep, send_wr);
+       if (!ep->rep_send_count) {
+               send_wr->send_flags |= IB_SEND_SIGNALED;
+               ep->rep_send_count = ep->rep_send_batch;
+       } else {
+               send_wr->send_flags &= ~IB_SEND_SIGNALED;
+               --ep->rep_send_count;
+       }
        rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
        if (rc)
                goto out_postsend_err;
 
  */
 
 struct rpcrdma_ep {
+       unsigned int            rep_send_count;
+       unsigned int            rep_send_batch;
        atomic_t                rep_cqcount;
        int                     rep_cqinit;
        int                     rep_connected;
        struct ib_recv_wr       rr_recv_wr;
 };
 
+/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
+ */
+struct rpcrdma_xprt;
+struct rpcrdma_sendctx {
+       struct ib_send_wr       sc_wr;
+       struct ib_cqe           sc_cqe;
+       struct rpcrdma_xprt     *sc_xprt;
+       unsigned int            sc_unmap_count;
+       struct ib_sge           sc_sges[];
+};
+
+/* Limit the number of SGEs that can be unmapped during one
+ * Send completion. This caps the amount of work a single
+ * completion can do before returning to the provider.
+ *
+ * Setting this to zero disables Send completion batching.
+ */
+enum {
+       RPCRDMA_MAX_SEND_BATCH = 7,
+};
+
 /*
  * struct rpcrdma_mw - external memory region metadata
  *
 struct rpcrdma_buffer;
 struct rpcrdma_req {
        struct list_head        rl_list;
-       unsigned int            rl_mapped_sges;
        unsigned int            rl_connect_cookie;
        struct rpcrdma_buffer   *rl_buffer;
        struct rpcrdma_rep      *rl_reply;
        struct xdr_stream       rl_stream;
        struct xdr_buf          rl_hdrbuf;
-       struct ib_send_wr       rl_send_wr;
-       struct ib_sge           rl_send_sge[RPCRDMA_MAX_SEND_SGES];
+       struct rpcrdma_sendctx  *rl_sendctx;
        struct rpcrdma_regbuf   *rl_rdmabuf;    /* xprt header */
        struct rpcrdma_regbuf   *rl_sendbuf;    /* rq_snd_buf */
        struct rpcrdma_regbuf   *rl_recvbuf;    /* rq_rcv_buf */
 
-       struct ib_cqe           rl_cqe;
        struct list_head        rl_all;
        bool                    rl_backchannel;
 
        struct list_head        rb_mws;
        struct list_head        rb_all;
 
+       unsigned long           rb_sc_head;
+       unsigned long           rb_sc_tail;
+       unsigned long           rb_sc_last;
+       struct rpcrdma_sendctx  **rb_sc_ctxs;
+
        spinlock_t              rb_lock;        /* protect buf lists */
        int                     rb_send_count, rb_recv_count;
        struct list_head        rb_send_bufs;
        unsigned long           mrs_recovered;
        unsigned long           mrs_orphaned;
        unsigned long           mrs_allocated;
+       unsigned long           empty_sendctx_q;
 
        /* accessed when receiving a reply */
        unsigned long long      total_rdma_reply;
 void rpcrdma_destroy_req(struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf);
+void rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc);
 
 struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
 void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
                              struct rpcrdma_req *req, u32 hdrlen,
                              struct xdr_buf *xdr,
                              enum rpcrdma_chunktype rtype);
-void rpcrdma_unmap_sges(struct rpcrdma_ia *, struct rpcrdma_req *);
+void rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc);
 int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);