/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
  * This software is available to you under a choice of one of two
 
        /* Set up the XDR head */
        rqstp->rq_arg.head[0].iov_base = page_address(page);
-       rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
+       rqstp->rq_arg.head[0].iov_len =
+               min_t(size_t, byte_count, ctxt->sge[0].length);
        rqstp->rq_arg.len = byte_count;
        rqstp->rq_arg.buflen = byte_count;
 
                page = ctxt->pages[sge_no];
                put_page(rqstp->rq_pages[sge_no]);
                rqstp->rq_pages[sge_no] = page;
-               bc -= min(bc, ctxt->sge[sge_no].length);
+               bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
                rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
                sge_no++;
        }
        rqstp->rq_arg.tail[0].iov_len = 0;
 }
 
-/* Encode a read-chunk-list as an array of IB SGE
- *
- * Assumptions:
- * - chunk[0]->position points to pages[0] at an offset of 0
- * - pages[] is not physically or virtually contiguous and consists of
- *   PAGE_SIZE elements.
- *
- * Output:
- * - sge array pointing into pages[] array.
- * - chunk_sge array specifying sge index and count for each
- *   chunk in the read list
- *
- */
-static int map_read_chunks(struct svcxprt_rdma *xprt,
-                          struct svc_rqst *rqstp,
-                          struct svc_rdma_op_ctxt *head,
-                          struct rpcrdma_msg *rmsgp,
-                          struct svc_rdma_req_map *rpl_map,
-                          struct svc_rdma_req_map *chl_map,
-                          int ch_count,
-                          int byte_count)
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
 {
-       int sge_no;
-       int sge_bytes;
-       int page_off;
-       int page_no;
-       int ch_bytes;
-       int ch_no;
-       struct rpcrdma_read_chunk *ch;
+       if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
+            RDMA_TRANSPORT_IWARP)
+               return 1;
+       else
+               return min_t(int, sge_count, xprt->sc_max_sge);
+}
 
-       sge_no = 0;
-       page_no = 0;
-       page_off = 0;
-       ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
-       ch_no = 0;
-       ch_bytes = ntohl(ch->rc_target.rs_length);
-       head->arg.head[0] = rqstp->rq_arg.head[0];
-       head->arg.tail[0] = rqstp->rq_arg.tail[0];
-       head->arg.pages = &head->pages[head->count];
-       head->hdr_count = head->count; /* save count of hdr pages */
-       head->arg.page_base = 0;
-       head->arg.page_len = ch_bytes;
-       head->arg.len = rqstp->rq_arg.len + ch_bytes;
-       head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
-       head->count++;
-       chl_map->ch[0].start = 0;
-       while (byte_count) {
-               rpl_map->sge[sge_no].iov_base =
-                       page_address(rqstp->rq_arg.pages[page_no]) + page_off;
-               sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
-               rpl_map->sge[sge_no].iov_len = sge_bytes;
-               /*
-                * Don't bump head->count here because the same page
-                * may be used by multiple SGE.
-                */
-               head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
-               rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
+                             struct svc_rqst *rqstp,
+                             struct svc_rdma_op_ctxt *head,
+                             int *page_no,
+                             u32 *page_offset,
+                             u32 rs_handle,
+                             u32 rs_length,
+                             u64 rs_offset,
+                             int last);
+
+/* Issue an RDMA_READ using the local lkey to map the data sink */
+static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
+                              struct svc_rqst *rqstp,
+                              struct svc_rdma_op_ctxt *head,
+                              int *page_no,
+                              u32 *page_offset,
+                              u32 rs_handle,
+                              u32 rs_length,
+                              u64 rs_offset,
+                              int last)
+{
+       struct ib_send_wr read_wr;
+       int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+       struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
+       int ret, read, pno;
+       u32 pg_off = *page_offset;
+       u32 pg_no = *page_no;
+
+       ctxt->direction = DMA_FROM_DEVICE;
+       ctxt->read_hdr = head;
+       pages_needed =
+               min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
+       read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
+
+       for (pno = 0; pno < pages_needed; pno++) {
+               int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
+
+               head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+               head->arg.page_len += len;
+               head->arg.len += len;
+               if (!pg_off)
+                       head->count++;
+               rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
                rqstp->rq_next_page = rqstp->rq_respages + 1;
+               ctxt->sge[pno].addr =
+                       ib_dma_map_page(xprt->sc_cm_id->device,
+                                       head->arg.pages[pg_no], pg_off,
+                                       PAGE_SIZE - pg_off,
+                                       DMA_FROM_DEVICE);
+               ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                          ctxt->sge[pno].addr);
+               if (ret)
+                       goto err;
+               atomic_inc(&xprt->sc_dma_used);
 
-               byte_count -= sge_bytes;
-               ch_bytes -= sge_bytes;
-               sge_no++;
-               /*
-                * If all bytes for this chunk have been mapped to an
-                * SGE, move to the next SGE
-                */
-               if (ch_bytes == 0) {
-                       chl_map->ch[ch_no].count =
-                               sge_no - chl_map->ch[ch_no].start;
-                       ch_no++;
-                       ch++;
-                       chl_map->ch[ch_no].start = sge_no;
-                       ch_bytes = ntohl(ch->rc_target.rs_length);
-                       /* If bytes remaining account for next chunk */
-                       if (byte_count) {
-                               head->arg.page_len += ch_bytes;
-                               head->arg.len += ch_bytes;
-                               head->arg.buflen += ch_bytes;
-                       }
+               /* The lkey here is either a local dma lkey or a dma_mr lkey */
+               ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
+               ctxt->sge[pno].length = len;
+               ctxt->count++;
+
+               /* adjust offset and wrap to next page if needed */
+               pg_off += len;
+               if (pg_off == PAGE_SIZE) {
+                       pg_off = 0;
+                       pg_no++;
                }
-               /*
-                * If this SGE consumed all of the page, move to the
-                * next page
-                */
-               if ((sge_bytes + page_off) == PAGE_SIZE) {
-                       page_no++;
-                       page_off = 0;
-                       /*
-                        * If there are still bytes left to map, bump
-                        * the page count
-                        */
-                       if (byte_count)
-                               head->count++;
-               } else
-                       page_off += sge_bytes;
+               rs_length -= len;
        }
-       BUG_ON(byte_count != 0);
-       return sge_no;
+
+       if (last && rs_length == 0)
+               set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+       else
+               clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+
+       memset(&read_wr, 0, sizeof(read_wr));
+       read_wr.wr_id = (unsigned long)ctxt;
+       read_wr.opcode = IB_WR_RDMA_READ;
+       ctxt->wr_op = read_wr.opcode;
+       read_wr.send_flags = IB_SEND_SIGNALED;
+       read_wr.wr.rdma.rkey = rs_handle;
+       read_wr.wr.rdma.remote_addr = rs_offset;
+       read_wr.sg_list = ctxt->sge;
+       read_wr.num_sge = pages_needed;
+
+       ret = svc_rdma_send(xprt, &read_wr);
+       if (ret) {
+               pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
+               set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+               goto err;
+       }
+
+       /* return current location in page array */
+       *page_no = pg_no;
+       *page_offset = pg_off;
+       ret = read;
+       atomic_inc(&rdma_stat_read);
+       return ret;
+ err:
+       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_put_context(ctxt, 0);
+       return ret;
 }
 
-/* Map a read-chunk-list to an XDR and fast register the page-list.
- *
- * Assumptions:
- * - chunk[0]  position points to pages[0] at an offset of 0
- * - pages[]   will be made physically contiguous by creating a one-off memory
- *             region using the fastreg verb.
- * - byte_count is # of bytes in read-chunk-list
- * - ch_count  is # of chunks in read-chunk-list
- *
- * Output:
- * - sge array pointing into pages[] array.
- * - chunk_sge array specifying sge index and count for each
- *   chunk in the read list
- */
-static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
+/* Issue an RDMA_READ using an FRMR to map the data sink */
+static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
                                struct svc_rqst *rqstp,
                                struct svc_rdma_op_ctxt *head,
-                               struct rpcrdma_msg *rmsgp,
-                               struct svc_rdma_req_map *rpl_map,
-                               struct svc_rdma_req_map *chl_map,
-                               int ch_count,
-                               int byte_count)
+                               int *page_no,
+                               u32 *page_offset,
+                               u32 rs_handle,
+                               u32 rs_length,
+                               u64 rs_offset,
+                               int last)
 {
-       int page_no;
-       int ch_no;
-       u32 offset;
-       struct rpcrdma_read_chunk *ch;
-       struct svc_rdma_fastreg_mr *frmr;
-       int ret = 0;
+       struct ib_send_wr read_wr;
+       struct ib_send_wr inv_wr;
+       struct ib_send_wr fastreg_wr;
+       u8 key;
+       int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+       struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
+       struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
+       int ret, read, pno;
+       u32 pg_off = *page_offset;
+       u32 pg_no = *page_no;
 
-       frmr = svc_rdma_get_frmr(xprt);
        if (IS_ERR(frmr))
                return -ENOMEM;
 
-       head->frmr = frmr;
-       head->arg.head[0] = rqstp->rq_arg.head[0];
-       head->arg.tail[0] = rqstp->rq_arg.tail[0];
-       head->arg.pages = &head->pages[head->count];
-       head->hdr_count = head->count; /* save count of hdr pages */
-       head->arg.page_base = 0;
-       head->arg.page_len = byte_count;
-       head->arg.len = rqstp->rq_arg.len + byte_count;
-       head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
+       ctxt->direction = DMA_FROM_DEVICE;
+       ctxt->frmr = frmr;
+       pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
+       read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
 
-       /* Fast register the page list */
-       frmr->kva = page_address(rqstp->rq_arg.pages[0]);
+       frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
        frmr->direction = DMA_FROM_DEVICE;
        frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
-       frmr->map_len = byte_count;
-       frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
-       for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
-               frmr->page_list->page_list[page_no] =
+       frmr->map_len = pages_needed << PAGE_SHIFT;
+       frmr->page_list_len = pages_needed;
+
+       for (pno = 0; pno < pages_needed; pno++) {
+               int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
+
+               head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+               head->arg.page_len += len;
+               head->arg.len += len;
+               if (!pg_off)
+                       head->count++;
+               rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
+               rqstp->rq_next_page = rqstp->rq_respages + 1;
+               frmr->page_list->page_list[pno] =
                        ib_dma_map_page(xprt->sc_cm_id->device,
-                                       rqstp->rq_arg.pages[page_no], 0,
+                                       head->arg.pages[pg_no], 0,
                                        PAGE_SIZE, DMA_FROM_DEVICE);
-               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                        frmr->page_list->page_list[page_no]))
-                       goto fatal_err;
+               ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                          frmr->page_list->page_list[pno]);
+               if (ret)
+                       goto err;
                atomic_inc(&xprt->sc_dma_used);
-               head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
-       }
-       head->count += page_no;
-
-       /* rq_respages points one past arg pages */
-       rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
-       rqstp->rq_next_page = rqstp->rq_respages + 1;
 
-       /* Create the reply and chunk maps */
-       offset = 0;
-       ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
-       for (ch_no = 0; ch_no < ch_count; ch_no++) {
-               int len = ntohl(ch->rc_target.rs_length);
-               rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
-               rpl_map->sge[ch_no].iov_len = len;
-               chl_map->ch[ch_no].count = 1;
-               chl_map->ch[ch_no].start = ch_no;
-               offset += len;
-               ch++;
+               /* adjust offset and wrap to next page if needed */
+               pg_off += len;
+               if (pg_off == PAGE_SIZE) {
+                       pg_off = 0;
+                       pg_no++;
+               }
+               rs_length -= len;
        }
 
-       ret = svc_rdma_fastreg(xprt, frmr);
-       if (ret)
-               goto fatal_err;
-
-       return ch_no;
-
- fatal_err:
-       printk("svcrdma: error fast registering xdr for xprt %p", xprt);
-       svc_rdma_put_frmr(xprt, frmr);
-       return -EIO;
-}
-
-static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
-                            struct svc_rdma_op_ctxt *ctxt,
-                            struct svc_rdma_fastreg_mr *frmr,
-                            struct kvec *vec,
-                            u64 *sgl_offset,
-                            int count)
-{
-       int i;
-       unsigned long off;
+       if (last && rs_length == 0)
+               set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+       else
+               clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 
-       ctxt->count = count;
-       ctxt->direction = DMA_FROM_DEVICE;
-       for (i = 0; i < count; i++) {
-               ctxt->sge[i].length = 0; /* in case map fails */
-               if (!frmr) {
-                       BUG_ON(!virt_to_page(vec[i].iov_base));
-                       off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;
-                       ctxt->sge[i].addr =
-                               ib_dma_map_page(xprt->sc_cm_id->device,
-                                               virt_to_page(vec[i].iov_base),
-                                               off,
-                                               vec[i].iov_len,
-                                               DMA_FROM_DEVICE);
-                       if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                                ctxt->sge[i].addr))
-                               return -EINVAL;
-                       ctxt->sge[i].lkey = xprt->sc_dma_lkey;
-                       atomic_inc(&xprt->sc_dma_used);
-               } else {
-                       ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
-                       ctxt->sge[i].lkey = frmr->mr->lkey;
-               }
-               ctxt->sge[i].length = vec[i].iov_len;
-               *sgl_offset = *sgl_offset + vec[i].iov_len;
+       /* Bump the key */
+       key = (u8)(frmr->mr->lkey & 0x000000FF);
+       ib_update_fast_reg_key(frmr->mr, ++key);
+
+       ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
+       ctxt->sge[0].lkey = frmr->mr->lkey;
+       ctxt->sge[0].length = read;
+       ctxt->count = 1;
+       ctxt->read_hdr = head;
+
+       /* Prepare FASTREG WR */
+       memset(&fastreg_wr, 0, sizeof(fastreg_wr));
+       fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+       fastreg_wr.send_flags = IB_SEND_SIGNALED;
+       fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
+       fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
+       fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
+       fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+       fastreg_wr.wr.fast_reg.length = frmr->map_len;
+       fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
+       fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
+       fastreg_wr.next = &read_wr;
+
+       /* Prepare RDMA_READ */
+       memset(&read_wr, 0, sizeof(read_wr));
+       read_wr.send_flags = IB_SEND_SIGNALED;
+       read_wr.wr.rdma.rkey = rs_handle;
+       read_wr.wr.rdma.remote_addr = rs_offset;
+       read_wr.sg_list = ctxt->sge;
+       read_wr.num_sge = 1;
+       if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
+               read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
+               read_wr.wr_id = (unsigned long)ctxt;
+               read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
+       } else {
+               read_wr.opcode = IB_WR_RDMA_READ;
+               read_wr.next = &inv_wr;
+               /* Prepare invalidate */
+               memset(&inv_wr, 0, sizeof(inv_wr));
+               inv_wr.wr_id = (unsigned long)ctxt;
+               inv_wr.opcode = IB_WR_LOCAL_INV;
+               inv_wr.send_flags = IB_SEND_SIGNALED;
+               inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
+       }
+       ctxt->wr_op = read_wr.opcode;
+
+       /* Post the chain */
+       ret = svc_rdma_send(xprt, &fastreg_wr);
+       if (ret) {
+               pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
+               set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+               goto err;
        }
-       return 0;
-}
 
-static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
-{
-       if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
-            RDMA_TRANSPORT_IWARP) &&
-           sge_count > 1)
-               return 1;
-       else
-               return min_t(int, sge_count, xprt->sc_max_sge);
+       /* return current location in page array */
+       *page_no = pg_no;
+       *page_offset = pg_off;
+       ret = read;
+       atomic_inc(&rdma_stat_read);
+       return ret;
+ err:
+       svc_rdma_unmap_dma(ctxt);
+       svc_rdma_put_context(ctxt, 0);
+       svc_rdma_put_frmr(xprt, frmr);
+       return ret;
 }
 
-/*
- * Use RDMA_READ to read data from the advertised client buffer into the
- * XDR stream starting at rq_arg.head[0].iov_base.
- * Each chunk in the array
- * contains the following fields:
- * discrim      - '1', This isn't used for data placement
- * position     - The xdr stream offset (the same for every chunk)
- * handle       - RMR for client memory region
- * length       - data transfer length
- * offset       - 64 bit tagged offset in remote memory region
- *
- * On our side, we need to read into a pagelist. The first page immediately
- * follows the RPC header.
- *
- * This function returns:
- * 0 - No error and no read-list found.
- *
- * 1 - Successful read-list processing. The data is not yet in
- * the pagelist and therefore the RPC request must be deferred. The
- * I/O completion will enqueue the transport again and
- * svc_rdma_recvfrom will complete the request.
- *
- * <0 - Error processing/posting read-list.
- *
- * NOTE: The ctxt must not be touched after the last WR has been posted
- * because the I/O completion processing may occur on another
- * processor and free / modify the context. Ne touche pas!
- */
-static int rdma_read_xdr(struct svcxprt_rdma *xprt,
-                        struct rpcrdma_msg *rmsgp,
-                        struct svc_rqst *rqstp,
-                        struct svc_rdma_op_ctxt *hdr_ctxt)
+static int rdma_read_chunks(struct svcxprt_rdma *xprt,
+                           struct rpcrdma_msg *rmsgp,
+                           struct svc_rqst *rqstp,
+                           struct svc_rdma_op_ctxt *head)
 {
-       struct ib_send_wr read_wr;
-       struct ib_send_wr inv_wr;
-       int err = 0;
-       int ch_no;
-       int ch_count;
-       int byte_count;
-       int sge_count;
-       u64 sgl_offset;
+       int page_no, ch_count, ret;
        struct rpcrdma_read_chunk *ch;
-       struct svc_rdma_op_ctxt *ctxt = NULL;
-       struct svc_rdma_req_map *rpl_map;
-       struct svc_rdma_req_map *chl_map;
+       u32 page_offset, byte_count;
+       u64 rs_offset;
+       rdma_reader_fn reader;
 
        /* If no read list is present, return 0 */
        ch = svc_rdma_get_read_chunk(rmsgp);
        if (ch_count > RPCSVC_MAXPAGES)
                return -EINVAL;
 
-       /* Allocate temporary reply and chunk maps */
-       rpl_map = svc_rdma_get_req_map();
-       chl_map = svc_rdma_get_req_map();
+       /* The request is completed when the RDMA_READs complete. The
+        * head context keeps all the pages that comprise the
+        * request.
+        */
+       head->arg.head[0] = rqstp->rq_arg.head[0];
+       head->arg.tail[0] = rqstp->rq_arg.tail[0];
+       head->arg.pages = &head->pages[head->count];
+       head->hdr_count = head->count;
+       head->arg.page_base = 0;
+       head->arg.page_len = 0;
+       head->arg.len = rqstp->rq_arg.len;
+       head->arg.buflen = rqstp->rq_arg.buflen;
 
-       if (!xprt->sc_frmr_pg_list_len)
-               sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
-                                           rpl_map, chl_map, ch_count,
-                                           byte_count);
+       /* Use FRMR if supported */
+       if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
+               reader = rdma_read_chunk_frmr;
        else
-               sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
-                                                rpl_map, chl_map, ch_count,
-                                                byte_count);
-       if (sge_count < 0) {
-               err = -EIO;
-               goto out;
-       }
-
-       sgl_offset = 0;
-       ch_no = 0;
+               reader = rdma_read_chunk_lcl;
 
+       page_no = 0; page_offset = 0;
        for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
-            ch->rc_discrim != 0; ch++, ch_no++) {
-               u64 rs_offset;
-next_sge:
-               ctxt = svc_rdma_get_context(xprt);
-               ctxt->direction = DMA_FROM_DEVICE;
-               ctxt->frmr = hdr_ctxt->frmr;
-               ctxt->read_hdr = NULL;
-               clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-               clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
+            ch->rc_discrim != 0; ch++) {
 
-               /* Prepare READ WR */
-               memset(&read_wr, 0, sizeof read_wr);
-               read_wr.wr_id = (unsigned long)ctxt;
-               read_wr.opcode = IB_WR_RDMA_READ;
-               ctxt->wr_op = read_wr.opcode;
-               read_wr.send_flags = IB_SEND_SIGNALED;
-               read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
                xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
                                 &rs_offset);
-               read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
-               read_wr.sg_list = ctxt->sge;
-               read_wr.num_sge =
-                       rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
-               err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
-                                       &rpl_map->sge[chl_map->ch[ch_no].start],
-                                       &sgl_offset,
-                                       read_wr.num_sge);
-               if (err) {
-                       svc_rdma_unmap_dma(ctxt);
-                       svc_rdma_put_context(ctxt, 0);
-                       goto out;
-               }
-               if (((ch+1)->rc_discrim == 0) &&
-                   (read_wr.num_sge == chl_map->ch[ch_no].count)) {
-                       /*
-                        * Mark the last RDMA_READ with a bit to
-                        * indicate all RPC data has been fetched from
-                        * the client and the RPC needs to be enqueued.
-                        */
-                       set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-                       if (hdr_ctxt->frmr) {
-                               set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
-                               /*
-                                * Invalidate the local MR used to map the data
-                                * sink.
-                                */
-                               if (xprt->sc_dev_caps &
-                                   SVCRDMA_DEVCAP_READ_W_INV) {
-                                       read_wr.opcode =
-                                               IB_WR_RDMA_READ_WITH_INV;
-                                       ctxt->wr_op = read_wr.opcode;
-                                       read_wr.ex.invalidate_rkey =
-                                               ctxt->frmr->mr->lkey;
-                               } else {
-                                       /* Prepare INVALIDATE WR */
-                                       memset(&inv_wr, 0, sizeof inv_wr);
-                                       inv_wr.opcode = IB_WR_LOCAL_INV;
-                                       inv_wr.send_flags = IB_SEND_SIGNALED;
-                                       inv_wr.ex.invalidate_rkey =
-                                               hdr_ctxt->frmr->mr->lkey;
-                                       read_wr.next = &inv_wr;
-                               }
-                       }
-                       ctxt->read_hdr = hdr_ctxt;
-               }
-               /* Post the read */
-               err = svc_rdma_send(xprt, &read_wr);
-               if (err) {
-                       printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
-                              err);
-                       set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-                       svc_rdma_unmap_dma(ctxt);
-                       svc_rdma_put_context(ctxt, 0);
-                       goto out;
+               byte_count = ntohl(ch->rc_target.rs_length);
+
+               while (byte_count > 0) {
+                       ret = reader(xprt, rqstp, head,
+                                    &page_no, &page_offset,
+                                    ntohl(ch->rc_target.rs_handle),
+                                    byte_count, rs_offset,
+                                    ((ch+1)->rc_discrim == 0) /* last */
+                                    );
+                       if (ret < 0)
+                               goto err;
+                       byte_count -= ret;
+                       rs_offset += ret;
+                       head->arg.buflen += ret;
                }
-               atomic_inc(&rdma_stat_read);
-
-               if (read_wr.num_sge < chl_map->ch[ch_no].count) {
-                       chl_map->ch[ch_no].count -= read_wr.num_sge;
-                       chl_map->ch[ch_no].start += read_wr.num_sge;
-                       goto next_sge;
-               }
-               sgl_offset = 0;
-               err = 1;
        }
-
- out:
-       svc_rdma_put_req_map(rpl_map);
-       svc_rdma_put_req_map(chl_map);
-
+       ret = 1;
+ err:
        /* Detach arg pages. svc_recv will replenish them */
-       for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
-               rqstp->rq_pages[ch_no] = NULL;
+       for (page_no = 0;
+            &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
+               rqstp->rq_pages[page_no] = NULL;
 
-       return err;
+       return ret;
 }
 
 static int rdma_read_complete(struct svc_rqst *rqstp,
                                  struct svc_rdma_op_ctxt,
                                  dto_q);
                list_del_init(&ctxt->dto_q);
-       }
-       if (ctxt) {
                spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
                return rdma_read_complete(rqstp, ctxt);
-       }
-
-       if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+       } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
                ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
                                  struct svc_rdma_op_ctxt,
                                  dto_q);
                if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
                        goto close_out;
 
-               BUG_ON(ret);
                goto out;
        }
        dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
        }
 
        /* Read read-list data. */
-       ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
+       ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
        if (ret > 0) {
                /* read-list posted, defer until data received from client. */
                goto defer;
-       }
-       if (ret < 0) {
+       } else if (ret < 0) {
                /* Post of read-list failed, free context. */
                svc_rdma_put_context(ctxt, 1);
                return 0;
 
 /*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
  * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
  *
  * This software is available to you under a choice of one of two
 
 #define RPCDBG_FACILITY        RPCDBG_SVCXPRT
 
-/* Encode an XDR as an array of IB SGE
- *
- * Assumptions:
- * - head[0] is physically contiguous.
- * - tail[0] is physically contiguous.
- * - pages[] is not physically or virtually contiguous and consists of
- *   PAGE_SIZE elements.
- *
- * Output:
- * SGE[0]              reserved for RCPRDMA header
- * SGE[1]              data from xdr->head[]
- * SGE[2..sge_count-2] data from xdr->pages[]
- * SGE[sge_count-1]    data from xdr->tail.
- *
- * The max SGE we need is the length of the XDR / pagesize + one for
- * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
- * reserves a page for both the request and the reply header, and this
- * array is only concerned with the reply we are assured that we have
- * on extra page for the RPCRMDA header.
- */
-static int fast_reg_xdr(struct svcxprt_rdma *xprt,
-                       struct xdr_buf *xdr,
-                       struct svc_rdma_req_map *vec)
-{
-       int sge_no;
-       u32 sge_bytes;
-       u32 page_bytes;
-       u32 page_off;
-       int page_no = 0;
-       u8 *frva;
-       struct svc_rdma_fastreg_mr *frmr;
-
-       frmr = svc_rdma_get_frmr(xprt);
-       if (IS_ERR(frmr))
-               return -ENOMEM;
-       vec->frmr = frmr;
-
-       /* Skip the RPCRDMA header */
-       sge_no = 1;
-
-       /* Map the head. */
-       frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
-       vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
-       vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
-       vec->count = 2;
-       sge_no++;
-
-       /* Map the XDR head */
-       frmr->kva = frva;
-       frmr->direction = DMA_TO_DEVICE;
-       frmr->access_flags = 0;
-       frmr->map_len = PAGE_SIZE;
-       frmr->page_list_len = 1;
-       page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
-       frmr->page_list->page_list[page_no] =
-               ib_dma_map_page(xprt->sc_cm_id->device,
-                               virt_to_page(xdr->head[0].iov_base),
-                               page_off,
-                               PAGE_SIZE - page_off,
-                               DMA_TO_DEVICE);
-       if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                frmr->page_list->page_list[page_no]))
-               goto fatal_err;
-       atomic_inc(&xprt->sc_dma_used);
-
-       /* Map the XDR page list */
-       page_off = xdr->page_base;
-       page_bytes = xdr->page_len + page_off;
-       if (!page_bytes)
-               goto encode_tail;
-
-       /* Map the pages */
-       vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
-       vec->sge[sge_no].iov_len = page_bytes;
-       sge_no++;
-       while (page_bytes) {
-               struct page *page;
-
-               page = xdr->pages[page_no++];
-               sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
-               page_bytes -= sge_bytes;
-
-               frmr->page_list->page_list[page_no] =
-                       ib_dma_map_page(xprt->sc_cm_id->device,
-                                       page, page_off,
-                                       sge_bytes, DMA_TO_DEVICE);
-               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                        frmr->page_list->page_list[page_no]))
-                       goto fatal_err;
-
-               atomic_inc(&xprt->sc_dma_used);
-               page_off = 0; /* reset for next time through loop */
-               frmr->map_len += PAGE_SIZE;
-               frmr->page_list_len++;
-       }
-       vec->count++;
-
- encode_tail:
-       /* Map tail */
-       if (0 == xdr->tail[0].iov_len)
-               goto done;
-
-       vec->count++;
-       vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
-
-       if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
-           ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
-               /*
-                * If head and tail use the same page, we don't need
-                * to map it again.
-                */
-               vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
-       } else {
-               void *va;
-
-               /* Map another page for the tail */
-               page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
-               va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
-               vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
-
-               frmr->page_list->page_list[page_no] =
-                   ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va),
-                                   page_off,
-                                   PAGE_SIZE,
-                                   DMA_TO_DEVICE);
-               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                        frmr->page_list->page_list[page_no]))
-                       goto fatal_err;
-               atomic_inc(&xprt->sc_dma_used);
-               frmr->map_len += PAGE_SIZE;
-               frmr->page_list_len++;
-       }
-
- done:
-       if (svc_rdma_fastreg(xprt, frmr))
-               goto fatal_err;
-
-       return 0;
-
- fatal_err:
-       printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
-       vec->frmr = NULL;
-       svc_rdma_put_frmr(xprt, frmr);
-       return -EIO;
-}
-
 static int map_xdr(struct svcxprt_rdma *xprt,
                   struct xdr_buf *xdr,
                   struct svc_rdma_req_map *vec)
        BUG_ON(xdr->len !=
               (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
 
-       if (xprt->sc_frmr_pg_list_len)
-               return fast_reg_xdr(xprt, xdr, vec);
-
        /* Skip the first sge, this is for the RPCRDMA header */
        sge_no = 1;
 
 }
 
 /* Assumptions:
- * - We are using FRMR
- *     - or -
  * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
  */
 static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
                sge_bytes = min_t(size_t,
                          bc, vec->sge[xdr_sge_no].iov_len-sge_off);
                sge[sge_no].length = sge_bytes;
-               if (!vec->frmr) {
-                       sge[sge_no].addr =
-                               dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
-                                           sge_bytes, DMA_TO_DEVICE);
-                       xdr_off += sge_bytes;
-                       if (ib_dma_mapping_error(xprt->sc_cm_id->device,
-                                                sge[sge_no].addr))
-                               goto err;
-                       atomic_inc(&xprt->sc_dma_used);
-                       sge[sge_no].lkey = xprt->sc_dma_lkey;
-               } else {
-                       sge[sge_no].addr = (unsigned long)
-                               vec->sge[xdr_sge_no].iov_base + sge_off;
-                       sge[sge_no].lkey = vec->frmr->mr->lkey;
-               }
+               sge[sge_no].addr =
+                       dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
+                                   sge_bytes, DMA_TO_DEVICE);
+               xdr_off += sge_bytes;
+               if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+                                        sge[sge_no].addr))
+                       goto err;
+               atomic_inc(&xprt->sc_dma_used);
+               sge[sge_no].lkey = xprt->sc_dma_lkey;
                ctxt->count++;
-               ctxt->frmr = vec->frmr;
                sge_off = 0;
                sge_no++;
                xdr_sge_no++;
        return 0;
  err:
        svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_frmr(xprt, vec->frmr);
        svc_rdma_put_context(ctxt, 0);
        /* Fatal error, close transport */
        return -EIO;
        res_ary = (struct rpcrdma_write_array *)
                &rdma_resp->rm_body.rm_chunks[1];
 
-       if (vec->frmr)
-               max_write = vec->frmr->map_len;
-       else
-               max_write = xprt->sc_max_sge * PAGE_SIZE;
+       max_write = xprt->sc_max_sge * PAGE_SIZE;
 
        /* Write chunks start at the pagelist */
        for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
        res_ary = (struct rpcrdma_write_array *)
                &rdma_resp->rm_body.rm_chunks[2];
 
-       if (vec->frmr)
-               max_write = vec->frmr->map_len;
-       else
-               max_write = xprt->sc_max_sge * PAGE_SIZE;
+       max_write = xprt->sc_max_sge * PAGE_SIZE;
 
        /* xdr offset starts at RPC message */
        nchunks = ntohl(arg_ary->wc_nchunks);
                      int byte_count)
 {
        struct ib_send_wr send_wr;
-       struct ib_send_wr inv_wr;
        int sge_no;
        int sge_bytes;
        int page_no;
                       "svcrdma: could not post a receive buffer, err=%d."
                       "Closing transport %p.\n", ret, rdma);
                set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
-               svc_rdma_put_frmr(rdma, vec->frmr);
                svc_rdma_put_context(ctxt, 0);
                return -ENOTCONN;
        }
        /* Prepare the context */
        ctxt->pages[0] = page;
        ctxt->count = 1;
-       ctxt->frmr = vec->frmr;
-       if (vec->frmr)
-               set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
-       else
-               clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
 
        /* Prepare the SGE for the RPCRDMA Header */
        ctxt->sge[0].lkey = rdma->sc_dma_lkey;
                int xdr_off = 0;
                sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
                byte_count -= sge_bytes;
-               if (!vec->frmr) {
-                       ctxt->sge[sge_no].addr =
-                               dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
-                                           sge_bytes, DMA_TO_DEVICE);
-                       xdr_off += sge_bytes;
-                       if (ib_dma_mapping_error(rdma->sc_cm_id->device,
-                                                ctxt->sge[sge_no].addr))
-                               goto err;
-                       atomic_inc(&rdma->sc_dma_used);
-                       ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
-               } else {
-                       ctxt->sge[sge_no].addr = (unsigned long)
-                               vec->sge[sge_no].iov_base;
-                       ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
-               }
+               ctxt->sge[sge_no].addr =
+                       dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
+                                   sge_bytes, DMA_TO_DEVICE);
+               xdr_off += sge_bytes;
+               if (ib_dma_mapping_error(rdma->sc_cm_id->device,
+                                        ctxt->sge[sge_no].addr))
+                       goto err;
+               atomic_inc(&rdma->sc_dma_used);
+               ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
                ctxt->sge[sge_no].length = sge_bytes;
        }
        BUG_ON(byte_count != 0);
                        ctxt->sge[page_no+1].length = 0;
        }
        rqstp->rq_next_page = rqstp->rq_respages + 1;
+
        BUG_ON(sge_no > rdma->sc_max_sge);
        memset(&send_wr, 0, sizeof send_wr);
        ctxt->wr_op = IB_WR_SEND;
        send_wr.num_sge = sge_no;
        send_wr.opcode = IB_WR_SEND;
        send_wr.send_flags =  IB_SEND_SIGNALED;
-       if (vec->frmr) {
-               /* Prepare INVALIDATE WR */
-               memset(&inv_wr, 0, sizeof inv_wr);
-               inv_wr.opcode = IB_WR_LOCAL_INV;
-               inv_wr.send_flags = IB_SEND_SIGNALED;
-               inv_wr.ex.invalidate_rkey =
-                       vec->frmr->mr->lkey;
-               send_wr.next = &inv_wr;
-       }
 
        ret = svc_rdma_send(rdma, &send_wr);
        if (ret)
 
  err:
        svc_rdma_unmap_dma(ctxt);
-       svc_rdma_put_frmr(rdma, vec->frmr);
        svc_rdma_put_context(ctxt, 1);
        return -EIO;
 }