};
 
 static const char transfertypes[][12] = {
-       "pure inline",  /* no chunks */
-       " read chunk",  /* some argument via rdma read */
-       "*read chunk",  /* entire request via rdma read */
-       "write chunk",  /* some result via rdma write */
+       "inline",       /* no chunks */
+       "read list",    /* some argument via rdma read */
+       "*read list",   /* entire request via rdma read */
+       "write list",   /* some result via rdma write */
        "reply chunk"   /* entire reply via rdma write */
 };
 
 /* Returns size of largest RPC-over-RDMA header in a Call message
  *
- * The client marshals only one chunk list per Call message.
- * The largest list is the Read list.
+ * The largest Call header contains a full-size Read list and a
+ * minimal Reply chunk.
  */
 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
 {
        maxsegs += 2;   /* segment for head and tail buffers */
        size = maxsegs * sizeof(struct rpcrdma_read_chunk);
 
+       /* Minimal Read chunk size */
+       size += sizeof(__be32); /* segment count */
+       size += sizeof(struct rpcrdma_segment);
+       size += sizeof(__be32); /* list discriminator */
+
        dprintk("RPC:       %s: max call header size = %u\n",
                __func__, size);
        return size;
        return n;
 }
 
+static inline __be32 *
+xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg)
+{
+       *iptr++ = cpu_to_be32(seg->mr_rkey);
+       *iptr++ = cpu_to_be32(seg->mr_len);
+       return xdr_encode_hyper(iptr, seg->mr_base);
+}
+
+/* XDR-encode the Read list. Supports encoding a list of read
+ * segments that belong to a single read chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ *  Read chunklist (a linked list):
+ *   N elements, position P (same P for all chunks of same arg!):
+ *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Read list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt,
+                        struct rpcrdma_req *req, struct rpc_rqst *rqst,
+                        __be32 *iptr, enum rpcrdma_chunktype rtype)
+{
+       struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+       unsigned int pos;
+       int n, nsegs;
+
+       if (rtype == rpcrdma_noch) {
+               *iptr++ = xdr_zero;     /* item not present */
+               return iptr;
+       }
+
+       pos = rqst->rq_snd_buf.head[0].iov_len;
+       if (rtype == rpcrdma_areadch)
+               pos = 0;
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg,
+                                    RPCRDMA_MAX_SEGS - req->rl_nchunks);
+       if (nsegs < 0)
+               return ERR_PTR(nsegs);
+
+       do {
+               n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false);
+               if (n <= 0)
+                       return ERR_PTR(n);
+
+               *iptr++ = xdr_one;      /* item present */
+
+               /* All read segments in this chunk
+                * have the same "position".
+                */
+               *iptr++ = cpu_to_be32(pos);
+               iptr = xdr_encode_rdma_segment(iptr, seg);
+
+               dprintk("RPC: %5u %s: read segment pos %u "
+                       "%d@0x%016llx:0x%08x (%s)\n",
+                       rqst->rq_task->tk_pid, __func__, pos,
+                       seg->mr_len, (unsigned long long)seg->mr_base,
+                       seg->mr_rkey, n < nsegs ? "more" : "last");
+
+               r_xprt->rx_stats.read_chunk_count++;
+               req->rl_nchunks++;
+               seg += n;
+               nsegs -= n;
+       } while (nsegs);
+       req->rl_nextseg = seg;
+
+       /* Finish Read list */
+       *iptr++ = xdr_zero;     /* Next item not present */
+       return iptr;
+}
+
+/* XDR-encode the Write list. Supports encoding a list containing
+ * one array of plain segments that belong to a single write chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ *  Write chunklist (a list of (one) counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Write list, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+                         struct rpc_rqst *rqst, __be32 *iptr,
+                         enum rpcrdma_chunktype wtype)
+{
+       struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+       int n, nsegs, nchunks;
+       __be32 *segcount;
+
+       if (wtype != rpcrdma_writech) {
+               *iptr++ = xdr_zero;     /* no Write list present */
+               return iptr;
+       }
+
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf,
+                                    rqst->rq_rcv_buf.head[0].iov_len,
+                                    wtype, seg,
+                                    RPCRDMA_MAX_SEGS - req->rl_nchunks);
+       if (nsegs < 0)
+               return ERR_PTR(nsegs);
+
+       *iptr++ = xdr_one;      /* Write list present */
+       segcount = iptr++;      /* save location of segment count */
+
+       nchunks = 0;
+       do {
+               n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+               if (n <= 0)
+                       return ERR_PTR(n);
+
+               iptr = xdr_encode_rdma_segment(iptr, seg);
+
+               dprintk("RPC: %5u %s: write segment "
+                       "%d@0x016%llx:0x%08x (%s)\n",
+                       rqst->rq_task->tk_pid, __func__,
+                       seg->mr_len, (unsigned long long)seg->mr_base,
+                       seg->mr_rkey, n < nsegs ? "more" : "last");
+
+               r_xprt->rx_stats.write_chunk_count++;
+               r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+               req->rl_nchunks++;
+               nchunks++;
+               seg   += n;
+               nsegs -= n;
+       } while (nsegs);
+       req->rl_nextseg = seg;
+
+       /* Update count of segments in this Write chunk */
+       *segcount = cpu_to_be32(nchunks);
+
+       /* Finish Write list */
+       *iptr++ = xdr_zero;     /* Next item not present */
+       return iptr;
+}
+
+/* XDR-encode the Reply chunk. Supports encoding an array of plain
+ * segments that belong to a single write (reply) chunk.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ *  Reply chunk (a counted array):
+ *   N elements:
+ *    1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns a pointer to the XDR word in the RDMA header following
+ * the end of the Reply chunk, or an error pointer.
+ */
+static __be32 *
+rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt,
+                          struct rpcrdma_req *req, struct rpc_rqst *rqst,
+                          __be32 *iptr, enum rpcrdma_chunktype wtype)
+{
+       struct rpcrdma_mr_seg *seg = req->rl_nextseg;
+       int n, nsegs, nchunks;
+       __be32 *segcount;
+
+       if (wtype != rpcrdma_replych) {
+               *iptr++ = xdr_zero;     /* no Reply chunk present */
+               return iptr;
+       }
+
+       nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg,
+                                    RPCRDMA_MAX_SEGS - req->rl_nchunks);
+       if (nsegs < 0)
+               return ERR_PTR(nsegs);
+
+       *iptr++ = xdr_one;      /* Reply chunk present */
+       segcount = iptr++;      /* save location of segment count */
+
+       nchunks = 0;
+       do {
+               n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true);
+               if (n <= 0)
+                       return ERR_PTR(n);
+
+               iptr = xdr_encode_rdma_segment(iptr, seg);
+
+               dprintk("RPC: %5u %s: reply segment "
+                       "%d@0x%016llx:0x%08x (%s)\n",
+                       rqst->rq_task->tk_pid, __func__,
+                       seg->mr_len, (unsigned long long)seg->mr_base,
+                       seg->mr_rkey, n < nsegs ? "more" : "last");
+
+               r_xprt->rx_stats.reply_chunk_count++;
+               r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+               req->rl_nchunks++;
+               nchunks++;
+               seg   += n;
+               nsegs -= n;
+       } while (nsegs);
+       req->rl_nextseg = seg;
+
+       /* Update count of segments in the Reply chunk */
+       *segcount = cpu_to_be32(nchunks);
+
+       return iptr;
+}
+
 /*
  * Copy write data inline.
  * This function is used for "small" requests. Data which is passed
        struct rpc_xprt *xprt = rqst->rq_xprt;
        struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
        struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-       char *base;
-       size_t rpclen;
-       ssize_t hdrlen;
        enum rpcrdma_chunktype rtype, wtype;
        struct rpcrdma_msg *headerp;
+       unsigned int pos;
+       ssize_t hdrlen;
+       size_t rpclen;
+       __be32 *iptr;
 
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
                return rpcrdma_bc_marshal_reply(rqst);
 #endif
 
-       /*
-        * rpclen gets amount of data in first buffer, which is the
-        * pre-registered buffer.
-        */
-       base = rqst->rq_svec[0].iov_base;
-       rpclen = rqst->rq_svec[0].iov_len;
-
        headerp = rdmab_to_msg(req->rl_rdmabuf);
        /* don't byte-swap XID, it's already done in request */
        headerp->rm_xid = rqst->rq_xid;
         */
        if (rpcrdma_args_inline(r_xprt, rqst)) {
                rtype = rpcrdma_noch;
+               rpcrdma_inline_pullup(rqst);
+               rpclen = rqst->rq_svec[0].iov_len;
        } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
                rtype = rpcrdma_readch;
+               rpclen = rqst->rq_svec[0].iov_len;
+               rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
        } else {
                r_xprt->rx_stats.nomsg_call_count++;
                headerp->rm_type = htonl(RDMA_NOMSG);
                rpclen = 0;
        }
 
-       /* The following simplification is not true forever */
-       if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
-               wtype = rpcrdma_noch;
-       if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
-               dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
-                       __func__);
-               return -EIO;
-       }
-
-       hdrlen = RPCRDMA_HDRLEN_MIN;
-
-       /*
-        * Pull up any extra send data into the preregistered buffer.
-        * When padding is in use and applies to the transfer, insert
-        * it and change the message type.
+       /* This implementation supports the following combinations
+        * of chunk lists in one RPC-over-RDMA Call message:
+        *
+        *   - Read list
+        *   - Write list
+        *   - Reply chunk
+        *   - Read list + Reply chunk
+        *
+        * It might not yet support the following combinations:
+        *
+        *   - Read list + Write list
+        *
+        * It does not support the following combinations:
+        *
+        *   - Write list + Reply chunk
+        *   - Read list + Write list + Reply chunk
+        *
+        * This implementation supports only a single chunk in each
+        * Read or Write list. Thus for example the client cannot
+        * send a Call message with a Position Zero Read chunk and a
+        * regular Read chunk at the same time.
         */
-       if (rtype == rpcrdma_noch) {
-
-               rpcrdma_inline_pullup(rqst);
-
-               headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
-               headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
-               headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
-               /* new length after pullup */
-               rpclen = rqst->rq_svec[0].iov_len;
-       } else if (rtype == rpcrdma_readch)
-               rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
-       if (rtype != rpcrdma_noch) {
-               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
-                                              headerp, rtype);
-               wtype = rtype;  /* simplify dprintk */
-
-       } else if (wtype != rpcrdma_noch) {
-               hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
-                                              headerp, wtype);
-       }
-       if (hdrlen < 0)
-               return hdrlen;
+       req->rl_nchunks = 0;
+       req->rl_nextseg = req->rl_segments;
+       iptr = headerp->rm_body.rm_chunks;
+       iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype);
+       if (IS_ERR(iptr))
+               goto out_unmap;
+       iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype);
+       if (IS_ERR(iptr))
+               goto out_unmap;
+       iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype);
+       if (IS_ERR(iptr))
+               goto out_unmap;
+       hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
 
        if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
                goto out_overflow;
 
-       dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd"
-               " headerp 0x%p base 0x%p lkey 0x%x\n",
-               __func__, transfertypes[wtype], hdrlen, rpclen,
-               headerp, base, rdmab_lkey(req->rl_rdmabuf));
+       dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
+               rqst->rq_task->tk_pid, __func__,
+               transfertypes[rtype], transfertypes[wtype],
+               hdrlen, rpclen);
 
        req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
        req->rl_send_iov[0].length = hdrlen;
        return 0;
 
 out_overflow:
-       pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s\n",
-               hdrlen, rpclen, transfertypes[wtype]);
+       pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
+               hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
        /* Terminate this RPC. Chunks registered above will be
         * released by xprt_release -> xprt_rmda_free .
         */
        return -EIO;
+
+out_unmap:
+       for (pos = 0; req->rl_nchunks--;)
+               pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
+                                                     &req->rl_segments[pos]);
+       return PTR_ERR(iptr);
 }
 
 /*