]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: Cleanup ZCopy page registrations
authorAmir Vadai <amirv@mellanox.co.il>
Mon, 30 Nov 2009 15:45:13 +0000 (17:45 +0200)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:43 +0000 (05:04 -0700)
- use ib_umem_get instead of get_user_pages
- Use FMR for RDMA reads
- ZCopy sender side to use ib_umem
- send remainder of ZCopy with BCopy

Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_dbg.h
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_tx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index d67a973d1b55ec44c26f02d78f480f3f07c7bc55..fe6fa1270cde654ef208269e15b2c097fee67671 100644 (file)
@@ -33,6 +33,8 @@
 #define SDP_FMR_POOL_SIZE      1024
 #define SDP_FMR_DIRTY_SIZE     ( SDP_FMR_POOL_SIZE / 4 )
 
+#define SDP_MAX_RDMA_READ_LEN (PAGE_SIZE * (SDP_FMR_SIZE - 2))
+
 #define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
 #define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
 
@@ -88,6 +90,7 @@ struct sdp_skb_cb {
        sdp_do_posts(ssk); \
 } while (0)
 
+extern int sdp_zcopy_thresh;
 extern struct workqueue_struct *sdp_wq;
 extern struct list_head sock_list;
 extern spinlock_t sock_list_lock;
@@ -225,9 +228,8 @@ struct rx_srcavail_state {
        u64 vaddr;
 
        /* Dest buff info */
-       u32             page_cnt;
-       struct page     **pages;
-       struct ib_sge   *sge;
+       struct ib_umem *umem;
+       struct ib_pool_fmr *fmr;
 
        /* Utility */
        u8  busy;
@@ -235,14 +237,12 @@ struct rx_srcavail_state {
 };
 
 struct tx_srcavail_state {
-       u32             page_cnt;
-       struct page     **pages;
-       u64             *addrs;
-
        /* Data below 'busy' will be reset */
        u8              busy;
 
+       struct ib_umem *umem;
        struct ib_pool_fmr *fmr;
+
        u32             bytes_sent;
        u32             bytes_acked;
 
@@ -323,6 +323,8 @@ struct sdp_sock {
        struct delayed_work srcavail_cancel_work;
        int srcavail_cancel_mseq;
 
+       struct ib_ucontext context;
+
        int max_sge;
 
        struct work_struct rx_comp_work;
@@ -769,12 +771,11 @@ void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
 int sdp_handle_rdma_read_cqe(struct sdp_sock *ssk);
 int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
                int len);
-int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
-               unsigned long addr);
 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
                struct rx_srcavail_state *rx_sa);
 int sdp_post_sendsm(struct sock *sk);
 void srcavail_cancel_timeout(struct work_struct *work);
-void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt);
+void sdp_abort_srcavail(struct sock *sk);
+void sdp_abort_rdma_read(struct sock *sk);
 
 #endif
index b3776218f8875cebe2e91619b08c82d12965b4d9..5ad54963817918127f676bd954291d0c309c7bf0 100644 (file)
@@ -120,6 +120,7 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
        sdp_sk(sk)->qp = id->qp;
        sdp_sk(sk)->ib_device = device;
        sdp_sk(sk)->qp_active = 1;
+       sdp_sk(sk)->context.device = device;
 
        init_waitqueue_head(&sdp_sk(sk)->wq);
 
index 10ef4f0605527b189f54147b8a4e697c185ad12b..c3f5bc7ae9df3a61b88a0d589f970b12d61c3398 100644 (file)
@@ -146,6 +146,7 @@ extern int sdp_data_debug_level;
 #define SOCK_REF_SEQ "SEQ" /* during proc read */
 #define SOCK_REF_DREQ_TO "DREQ_TO" /* dreq timeout is pending */
 #define SOCK_REF_ZCOPY "ZCOPY" /* zcopy send in process */
+#define SOCK_REF_RDMA_RD "RDMA_RD" /* RDMA read in process */
 
 #define sock_hold(sk, msg)  sock_ref(sk, msg, sock_hold)
 #define sock_put(sk, msg)  sock_ref(sk, msg, sock_put)
index 4035cc1dc63ba56c8cb1ca979daa6d6316b0dfc0..eab89a69308d1df807e6aae28046d9826782cc2f 100644 (file)
@@ -70,6 +70,7 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst,
 #include <rdma/rdma_cm.h>
 #include <rdma/ib_fmr_pool.h>
 #include <rdma/ib_verbs.h>
+#include <linux/pagemap.h>
 /* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */
 #include "sdp_socket.h"
 #include "sdp.h"
@@ -94,7 +95,7 @@ SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
        "Default idle time in seconds before keepalive probe sent.");
 SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 0,
        "Zero copy send using SEND threshold; 0=0ff.");
-SDP_MODPARAM_SINT(sdp_zcopy_thresh, 64*1024,
+SDP_MODPARAM_INT(sdp_zcopy_thresh, 64*1024,
        "Zero copy using RDMA threshold; 0=0ff.");
 #define SDP_RX_COAL_TIME_HIGH 128
 SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
@@ -434,10 +435,9 @@ void sdp_reset_sk(struct sock *sk, int rc)
        if (ssk->tx_ring.cq)
                sdp_xmit_poll(ssk, 1);
 
-       if (ssk->tx_sa) {
-               sdp_unmap_dma(sk, ssk->tx_sa->addrs, ssk->tx_sa->page_cnt);
-               ssk->tx_sa->addrs = NULL;
-       }
+       sdp_abort_srcavail(sk);
+
+       sdp_abort_rdma_read(sk);
 
        if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
                sdp_dbg(sk, "setting state to error\n");
@@ -1413,6 +1413,72 @@ static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
        return NULL;
 }
 
+static int sdp_get_user_pages(struct page **pages, const unsigned int nr_pages,
+                             unsigned long uaddr, int rw)
+{
+       int res, i;
+
+        /* Try to fault in all of the necessary pages */
+       down_read(&current->mm->mmap_sem);
+        /* rw==READ means read from drive, write into memory area */
+       res = get_user_pages(
+               current,
+               current->mm,
+               uaddr,
+               nr_pages,
+               rw == READ,
+               0, /* don't force */
+               pages,
+               NULL);
+       up_read(&current->mm->mmap_sem);
+
+       /* Errors and no page mapped should return here */
+       if (res < nr_pages)
+               return res;
+
+        for (i=0; i < nr_pages; i++) {
+                /* FIXME: flush superflous for rw==READ,
+                 * probably wrong function for rw==WRITE
+                 */
+               flush_dcache_page(pages[i]);
+        }
+
+       return nr_pages;
+}
+
+static int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
+               unsigned long addr)
+{
+       int done_pages = 0;
+
+       sdp_dbg_data(sk, "count: 0x%x addr: 0x%lx\n", page_cnt, addr);
+
+       addr &= PAGE_MASK;
+       if (segment_eq(get_fs(), KERNEL_DS)) {
+               for (done_pages = 0; done_pages < page_cnt; done_pages++) {
+                       pages[done_pages] = virt_to_page(addr);
+                       if (!pages[done_pages])
+                               break;
+                       get_page(pages[done_pages]);
+                       addr += PAGE_SIZE;
+               }
+       } else {
+               done_pages = sdp_get_user_pages(pages, page_cnt, addr, WRITE);
+       }
+
+       if (unlikely(done_pages != page_cnt))
+               goto err;
+
+       return 0;
+
+err:
+       sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n",
+                       done_pages, page_cnt);
+       for (; done_pages > 0; done_pages--)
+               page_cache_release(pages[done_pages - 1]);
+
+       return -1;
+}
 
 static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
                                         char __user *base,
@@ -1737,16 +1803,26 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        int iovlen, flags;
        int size_goal;
        int err, copied;
+       int zcopied = 0;
        long timeo;
        struct bzcopy_state *bz = NULL;
        SDPSTATS_COUNTER_INC(sendmsg);
 
        if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) {
-               err = sdp_sendmsg_zcopy(iocb, sk, msg, size);
-               if (err != -EAGAIN && err != -ETIME)
-                       return err;
+               zcopied = sdp_sendmsg_zcopy(iocb, sk, msg, size);
+               if (zcopied == -EAGAIN || zcopied == -ETIME) {
+                       sdp_dbg_data(sk, "Got SendSM/Timedout - fallback to regular send\n");
+               } else if (zcopied < 0) {
+                       sdp_dbg_data(sk, "ZCopy send err: %d\n", zcopied);
+                       return zcopied;
+               }
+
+               if (size == zcopied) {
+                       return size;
+               }
 
-               /* Got SendSM/Timedout - fallback to regular send */    
+               sdp_dbg_data(sk, "ZCopied: 0x%x\n", zcopied);
+               sdp_dbg_data(sk, "Continue to BCopy 0x%lx bytes left\n", size - zcopied);
        }
 
        lock_sock(sk);
@@ -1914,7 +1990,9 @@ out:
        posts_handler_put(ssk);
 
        release_sock(sk);
-       return copied;
+
+       sdp_dbg_data(sk, "BCopied: 0x%x\n", copied);    
+       return copied + zcopied;
 
 do_fault:
        sdp_prf(sk, skb, "prepare fault");
@@ -2177,14 +2255,15 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                        goto skb_cleanup;
                                }
                        } else {
+                               sdp_dbg_data(sk, "memcpy 0x%lx bytes +0x%x -> %p\n",
+                                               used, offset, msg->msg_iov[0].iov_base);
+
                                err = skb_copy_datagram_iovec(skb, offset,
                                                /* TODO: skip header? */
                                                msg->msg_iov, used);
                                if (rx_sa) {
                                        rx_sa->used += used;
                                        rx_sa->reported += used;
-                                       sdp_dbg_data(sk, "copied %ld from "
-                                                       "payload\n", used);
                                }
                        }
                        if (err) {
index 573373a4c76815d526583e2f58e4a13edcf0782f..eae49df1a52bc2ec4524ece5d07e1e38a0aeaf0b 100644 (file)
@@ -237,16 +237,18 @@ static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
        }
 
        if (wc->wr_id & SDP_OP_RDMA) {
+               /* TODO: handle failed RDMA read cqe */
+
                sdp_dbg_data(sk, "TX comp: RDMA read. status: %d\n", wc->status);
                sdp_prf1(sk, NULL, "TX comp: RDMA read");
 
                if (!ssk->tx_ring.rdma_inflight) {
-                       sdp_dbg(sk, "ERROR: unexpected RDMA read\n");
+                       sdp_warn(sk, "ERROR: unexpected RDMA read\n");
                        return;
                }
 
                if (!ssk->tx_ring.rdma_inflight->busy) {
-                       sdp_dbg(sk, "ERROR: too many RDMA read completions\n");
+                       sdp_warn(sk, "ERROR: too many RDMA read completions\n");
                        return;
                }
 
@@ -334,7 +336,7 @@ static int sdp_tx_handler_select(struct sdp_sock *ssk)
                        return 0;
                } else
                        sdp_prf1(sk, NULL, "Unexpected: sk_sleep=%p, "
-                               "waitqueue_active: %d",
+                               "waitqueue_active: %d\n",
                                sk->sk_sleep, waitqueue_active(sk->sk_sleep));
        }
 
@@ -371,8 +373,10 @@ static void sdp_poll_tx_timeout(unsigned long data)
                goto out;
        }
 
-       if (unlikely(sk->sk_state == TCP_CLOSE))
+       if (unlikely(sk->sk_state == TCP_CLOSE)) {
+               sdp_warn(sk, "Socket is closed\n");
                goto out;
+       }
 
        wc_processed = sdp_process_tx_cq(ssk);
        if (!wc_processed)
index 2831dd609842f70904e6ef43b445f79ffa5f4777..81baa6ed5b8efffb77b8df7266db757cc3c658f1 100644 (file)
 #include <rdma/rdma_cm.h>
 #include <rdma/ib_verbs.h>
 #include <rdma/ib_fmr_pool.h>
-#include <linux/pagemap.h>
+#include <rdma/ib_umem.h> 
 #include <net/tcp.h> /* for memcpy_toiovec */
 #include <asm/io.h>
 #include <asm/uaccess.h>
 #include <linux/delay.h>
 #include "sdp.h"
 
-static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
-               int page_idx, int off, size_t len)
+static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        struct sk_buff *skb;
        int payload_len;
+       struct page *payload_pg;
+       int off, len;
+       struct ib_umem_chunk *chunk;
 
        WARN_ON(ssk->tx_sa);
 
        BUG_ON(!tx_sa);
        BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
+       BUG_ON(!tx_sa->umem);
+       BUG_ON(!tx_sa->umem->chunk_list.next);
+
+       chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
+       BUG_ON(!chunk->nmap);
+
+       off = tx_sa->umem->offset;
+       len = tx_sa->umem->length;
 
        tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
 
@@ -64,17 +74,22 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
        if (!skb) {
                return -ENOMEM;
        }
-       sdp_prf1(sk, skb, "sending SrcAvail");
+       sdp_dbg_data(sk, "sending SrcAvail\n");
                
        TX_SRCAVAIL_STATE(skb) = tx_sa; /* tx_sa is hanged on the skb 
                                         * but continue to live after skb is freed */
        ssk->tx_sa = tx_sa;
 
        /* must have payload inlined in SrcAvail packet in combined mode */
-       payload_len = min(len, PAGE_SIZE - off);
-       get_page(tx_sa->pages[page_idx]);
+       payload_len = tx_sa->umem->page_size - off;
+       payload_pg  = sg_page(&chunk->page_list[0]);
+       get_page(payload_pg);
+
+       sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
+               off, payload_pg, payload_len);
+
        skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
-                       tx_sa->pages[page_idx], off, payload_len);
+                       payload_pg, off, payload_len);
 
        skb->len             += payload_len;
        skb->data_len         = payload_len;
@@ -87,7 +102,7 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
        ssk->write_seq += payload_len;
        SDP_SKB_CB(skb)->end_seq += payload_len;
 
-       tx_sa->bytes_sent = len;
+       tx_sa->bytes_sent = tx_sa->umem->length;
        tx_sa->bytes_acked = payload_len;
 
        /* TODO: pushing the skb into the tx_queue should be enough */
@@ -129,7 +144,7 @@ void srcavail_cancel_timeout(struct work_struct *work)
        release_sock(sk);
 }
 
-static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
+static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
                int ignore_signals)
 {
        struct sock *sk = &ssk->isk.sk;
@@ -217,62 +232,46 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
        return err;
 }
 
-static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
+static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
 {
        struct sock *sk = &ssk->isk.sk;
-       int err = 0;
-       long current_timeo = *timeo_p;
+       long timeo = HZ * 5; /* Timeout for for RDMA read */
        DEFINE_WAIT(wait);
 
        sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
        while (1) {
-               prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+               prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
 
-               if (unlikely(!ssk->qp_active)) {
-                       err = -EPIPE;
-                       sdp_warn(sk, "Socket closed\n");
-                       break;
-               }
-
-               if (unlikely(!*timeo_p)) {
-                       err = -EAGAIN;
-                       sdp_warn(sk, "Timedout\n");
+               if (!ssk->tx_ring.rdma_inflight->busy) {
+                       sdp_dbg_data(sk, "got rdma cqe\n");
                        break;
                }
 
-               if (unlikely(signal_pending(current))) {
-                       err = sock_intr_errno(*timeo_p);
-                       sdp_warn(sk, "Signalled\n");
+               if (!ssk->qp_active) {
+                       sdp_warn(sk, "QP destroyed\n");
                        break;
                }
 
-               if (!ssk->tx_ring.rdma_inflight->busy) {
-                       sdp_dbg_data(sk, "got rdma cqe\n");
+               if (!timeo) {
+                       sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
+                       WARN_ON(1);
                        break;
                }
 
                posts_handler_put(ssk);
 
                sdp_prf1(sk, NULL, "Going to sleep");
-               sk_wait_event(sk, &current_timeo, 
+               sk_wait_event(sk, &timeo, 
                        !ssk->tx_ring.rdma_inflight->busy);
                sdp_prf1(sk, NULL, "Woke up");
                sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
 
                posts_handler_get(ssk);
-
-               *timeo_p = current_timeo;
        }
 
        finish_wait(sk->sk_sleep, &wait);
 
-       if (ssk->tx_ring.rdma_inflight->busy) {
-               sdp_warn(sk, "RDMA reads are in the air: %d\n",
-                       ssk->tx_ring.rdma_inflight->busy);
-       } else 
-               sdp_dbg_data(sk, "Finished waiting - no rdma's inflight\n");
-
-       return err;
+       sdp_dbg_data(sk, "Finished waiting\n");
 }
 
 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
@@ -397,236 +396,115 @@ out:
        return;
 }
 
-static int sdp_get_user_pages(struct page **pages, const unsigned int nr_pages,
-                             unsigned long uaddr, int rw)
-{
-       int res, i;
-
-        /* Try to fault in all of the necessary pages */
-       down_read(&current->mm->mmap_sem);
-        /* rw==READ means read from drive, write into memory area */
-       res = get_user_pages(
-               current,
-               current->mm,
-               uaddr,
-               nr_pages,
-               rw == READ,
-               0, /* don't force */
-               pages,
-               NULL);
-       up_read(&current->mm->mmap_sem);
-
-       /* Errors and no page mapped should return here */
-       if (res < nr_pages)
-               return res;
-
-        for (i=0; i < nr_pages; i++) {
-                /* FIXME: flush superflous for rw==READ,
-                 * probably wrong function for rw==WRITE
-                 */
-               flush_dcache_page(pages[i]);
-        }
-
-       return nr_pages;
-}
-
-int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
-               unsigned long addr)
+static int sdp_alloc_fmr(struct sock *sk, void *uaddr, size_t len,
+       struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
 {
-       int done_pages = 0;
-
-       sdp_dbg_data(sk, "count: 0x%x addr: 0x%lx\n", page_cnt, addr);
+       struct ib_pool_fmr *fmr;
+       struct ib_umem *umem;
+       u64 *pages;
+       struct ib_umem_chunk *chunk;
+       int n, j, k;
+       int rc = 0;
 
-       addr &= PAGE_MASK;
-       if (segment_eq(get_fs(), KERNEL_DS)) {
-               for (done_pages = 0; done_pages < page_cnt; done_pages++) {
-                       pages[done_pages] = virt_to_page(addr);
-                       if (!pages[done_pages])
-                               break;
-                       get_page(pages[done_pages]);
-                       addr += PAGE_SIZE;
-               }
-       } else {
-               done_pages = sdp_get_user_pages(pages, page_cnt, addr, WRITE);
+       if (len > SDP_MAX_RDMA_READ_LEN) {
+               sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
+                       len, SDP_MAX_RDMA_READ_LEN);
+               len = SDP_MAX_RDMA_READ_LEN;
        }
 
-       if (unlikely(done_pages != page_cnt))
-               goto err;
+       sdp_dbg_data(sk, "user buf: %p, len:0x%lx\n", uaddr, len);
 
-       return 0;
+       umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
+               IB_ACCESS_REMOTE_WRITE, 0);
 
-err:
-       sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n",
-                       done_pages, page_cnt);
-       for (; done_pages > 0; done_pages--)
-               page_cache_release(pages[done_pages - 1]);
+       if (IS_ERR(umem)) {
+               rc = PTR_ERR(umem);
+               sdp_warn(sk, "Error getting umem: %d\n", rc);
+               goto err_umem_get;
+       }
 
-       return -1;
-}
+       sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
+               umem->offset, umem->length);
 
-static void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt)
-{
-       int i;
-       sdp_dbg_data(sk, "count: %d\n", page_cnt);
+       pages = (u64 *) __get_free_page(GFP_KERNEL);
+       if (!pages)
+               goto err_pages_alloc;
 
-       for (i = 0; i < page_cnt; i++) {
-               set_page_dirty_lock(pages[i]);
-               page_cache_release(pages[i]);
-       }
-}
+       n = 0;
 
-static int sdp_map_dma(struct sock *sk, u64 *addrs, struct page **pages,
-               int nr_pages, size_t offset, size_t count)
-{
-       struct ib_device *dev = sdp_sk(sk)->ib_device;
-       int i = 0;
-       sdp_dbg_data(sk, "map dma offset: 0x%lx count: 0x%lx\n", offset, count);
-
-#define map_page(p, o, l) ({\
-       u64 addr = ib_dma_map_page(dev, p, o, l, DMA_TO_DEVICE); \
-       if (ib_dma_mapping_error(dev, addr)) { \
-               sdp_warn(sk, "Error mapping page %p off: 0x%lx len: 0x%lx\n", \
-                               p, o, l); \
-               goto err; \
-       } \
-       addr; \
-})
-
-       if (nr_pages > 1) {
-               size_t length = PAGE_SIZE - offset;
-               addrs[0] = map_page(pages[0], offset, length);
-               count -= length;
-               for (i=1; i < nr_pages ; i++) {
-                       length = count < PAGE_SIZE ? count : PAGE_SIZE;
-                       addrs[i] = map_page(pages[i], 0UL, length);
-                       count -= PAGE_SIZE;
+       list_for_each_entry(chunk, &umem->chunk_list, list) {
+               for (j = 0; j < chunk->nmap; ++j) {
+                       len = sg_dma_len(&chunk->page_list[j]) >> PAGE_SHIFT;
+                       for (k = 0; k < len; ++k) {
+                               pages[n++] = sg_dma_address(&chunk->page_list[j]) +
+                                       umem->page_size * k;
+                       }
                }
-       } else
-               addrs[0] = map_page(pages[0], offset, count);
+       }
 
-       return 0;
-err:
-       for (; i > 0; i--) {
-               ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
+       fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
+       if (IS_ERR(fmr)) {
+               sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
+               goto err_fmr_alloc;
        }
-       return -1;
-}
 
-void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt)
-{
-       int i;
-       struct ib_device *dev = sdp_sk(sk)->ib_device;
+       free_page((unsigned long) pages);
 
-       sdp_dbg_data(sk, "count: %d\n", page_cnt);
+       *_umem = umem;
+       *_fmr = fmr;
 
-       for (i = 0; i < page_cnt; i++)
-               ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
-}
+       return 0;
 
-static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt,
-               struct page **pages, int offset, int len)
-{
-       struct ib_device *dev = sdp_sk(sk)->ib_device;
-       int i;
-       int left = len;
-       sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
-
-       for (i = 0; i < page_cnt; i++) {
-               int o = i == 0 ? offset : 0;
-               int l = MIN(left, PAGE_SIZE - o);
-
-               sge[i].lkey = sdp_sk(sk)->sdp_dev->mr->lkey;
-               sge[i].length = l;
-
-               sge[i].addr = ib_dma_map_page(dev,
-                               pages[i],
-                               0, /* map page with offset (fbo) not working */
-                               l + o, /* compensate on 0 offset */
-                               DMA_FROM_DEVICE);
-               if (ib_dma_mapping_error(dev, sge[i].addr)) {
-                       sdp_warn(sk, "Error map page 0x%llx off: %d len: %d\n",
-                                       sge[i].addr, o, l);
-                       goto err;
-               }
-               sge[i].addr += o;
+err_fmr_alloc: 
+       free_page((unsigned long) pages);
 
-               sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d | "
-                       "addr: 0x%llx length: %d\n",
-                       i, pages[i], o, l, sge[i].addr, sge[i].length);
-               left -= l;
-       }
+err_pages_alloc:
+       ib_umem_release(umem);
 
-       WARN_ON(left != 0);
+err_umem_get:
 
-       return 0;
-err:
-       for (; i > 0; i--) {
-               ib_dma_unmap_page(dev, sge[i].addr, PAGE_SIZE, DMA_FROM_DEVICE);
-       }
-       return -1;
+       return rc;
 }
 
-static void sdp_unmap_dma_sge(struct sock *sk, struct ib_sge *sge,
-               int page_cnt, int offset, int len)
+void sdp_free_fmr(struct sock *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
 {
-       int i;
-       struct ib_device *dev = sdp_sk(sk)->ib_device;
+       if (!sdp_sk(sk)->qp_active)
+               return;
 
-       sdp_dbg_data(sk, "count: %d\n", page_cnt);
+       ib_fmr_pool_unmap(*_fmr);
+       *_fmr = NULL;
 
-       for (i = 0; i < page_cnt; i++) {
-               int l = PAGE_SIZE;
-
-               if (i == page_cnt - 1) {
-                       /* Last page */
-                       l = (len + offset) & (PAGE_SIZE - 1);
-                       if (l == 0)
-                               l = PAGE_SIZE;
-               }
-               ib_dma_unmap_page(dev, sge[i].addr, l, DMA_FROM_DEVICE);
-       }
+       ib_umem_release(*_umem);
+       *_umem = NULL;
 }
 
-static struct ib_pool_fmr *sdp_map_fmr(struct sock *sk, int page_cnt,
-               u64 *addrs)
+static int sdp_post_rdma_read(struct sock *sk, struct rx_srcavail_state *rx_sa)
 {
-       struct ib_pool_fmr *fmr;
-       int ret = 0;
-
-       fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, addrs,
-                       page_cnt, 0);
-       if (IS_ERR(fmr)) {
-               ret = PTR_ERR(fmr);
-               fmr = NULL;
-               sdp_warn(sk, "Error allocating fmr: %d\n", ret);
-               goto err;
-       }
+       struct sdp_sock *ssk = sdp_sk(sk);
+       struct ib_send_wr *bad_wr;
+       struct ib_send_wr wr = { NULL };
+       struct ib_sge sge;
 
-       return fmr;
-err:
-       return NULL;
-}
+       wr.opcode = IB_WR_RDMA_READ;
+       wr.next = NULL;
+       wr.wr_id = SDP_OP_RDMA;
+       wr.wr.rdma.rkey = rx_sa->rkey;
+       wr.send_flags = 0;
 
-enum zcopy_type {
-       SDP_ZCOPY_TYPE_RX,
-       SDP_ZCOPY_TYPE_TX,
-};
+       ssk->tx_ring.rdma_inflight = rx_sa;
 
-static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int page_cnt)
-{
-       struct tx_srcavail_state *tx_sa;
+       sge.addr = rx_sa->umem->offset;
+       sge.length = rx_sa->umem->length;
+       sge.lkey = rx_sa->fmr->fmr->lkey;
 
+       wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
+       wr.num_sge = 1;
+       wr.sg_list = &sge;
+       rx_sa->busy++;
 
-       tx_sa = kzalloc(sizeof(struct tx_srcavail_state) +
-                       sizeof(struct page *) * page_cnt + 
-                       sizeof(u64) * page_cnt, GFP_KERNEL);
-       if (!tx_sa)
-               return ERR_PTR(-ENOMEM);
-       
-       tx_sa->pages = (struct page **)(tx_sa+1);
-       tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]);
+       wr.send_flags = IB_SEND_SIGNALED;
 
-       return tx_sa;
+       return ib_post_send(ssk->qp, &wr, &bad_wr);
 }
 
 int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
@@ -634,139 +512,70 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb);
+       int got_srcavail_cancel;
        int rc = 0;
-       struct ib_send_wr *bad_wr;
-       struct ib_send_wr wr = { NULL };
-       long timeo;
-       struct ib_sge *sge;
-       int sge_left;
-       int copied;
-       int offset;
 
        sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read."
                " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
-       if (len > rx_sa->len)
-               len = rx_sa->len;
-
-       offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
 
-       rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
-       sdp_dbg_data(sk, "page_cnt = 0x%x len:0x%x offset: 0x%x\n",
-                       rx_sa->page_cnt, len, offset);
+       sock_hold(sk, SOCK_REF_RDMA_RD);
 
-       rx_sa->pages = 
-               (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt +
-               sizeof(struct ib_sge) * rx_sa->page_cnt, GFP_KERNEL);
-       if (!rx_sa->pages) {
-               sdp_warn(sk, "Error allocating zcopy context\n");
-               goto err_alloc_zcopy;
+       if (len > rx_sa->len) {
+               sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
+               WARN_ON(1);
+               len = rx_sa->len;
        }
 
-       rx_sa->sge = (struct ib_sge *)(&rx_sa->pages[rx_sa->page_cnt]);
-
-       rc = sdp_get_pages(sk, rx_sa->pages, rx_sa->page_cnt,
-                       (unsigned long)iov->iov_base);
-       if (rc)
-               goto err_get_pages;
-
-       rc = sdp_map_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, rx_sa->pages,
-                       offset, len);
-       if (rc)
-               goto err_map_dma;
-
-       wr.opcode = IB_WR_RDMA_READ;
-       wr.next = NULL;
-       wr.wr_id = SDP_OP_RDMA;
-       wr.wr.rdma.rkey = rx_sa->rkey;
-       wr.send_flags = 0;
+       rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
+       if (rc) {
+               sdp_warn(sk, "Error allocating fmr: %d\n", rc);
+               goto err_alloc_fmr;
+       }
 
-       timeo = sock_sndtimeo(sk, 0);
+       rc = sdp_post_rdma_read(sk, rx_sa);
+       if (unlikely(rc)) {
+               sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
+               sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+               wake_up(&ssk->wq);
+               goto err_post_send;
+       }
 
-       ssk->tx_ring.rdma_inflight = rx_sa;
-       copied = 0;
-       sge = rx_sa->sge;
-       sge_left = rx_sa->page_cnt;
-       do {
-               /* Len error when using sge_cnt > 30 ?? */
-               int sge_cnt = min(sge_left, ssk->max_sge - 2);
-
-               wr.wr.rdma.remote_addr = rx_sa->vaddr + copied + rx_sa->used;
-               wr.num_sge = sge_cnt;
-               wr.sg_list = sge;
-               rx_sa->busy++;
-
-               sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx "
-                       "copied: 0x%x rkey: 0x%x in_bytes: 0x%x\n",
-                       sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey,
-                       sge_bytes(sge, sge_cnt));
-               if (sge_left == sge_cnt) {
-                       wr.send_flags = IB_SEND_SIGNALED;
-                       sdp_dbg_data(sk, "last wr is signaled\n");
-               }
-               sdp_prf1(sk, NULL, "TX: RDMA read 0x%x bytes %s",
-                       sge_bytes(sge, sge_cnt),
-                       wr.send_flags & IB_SEND_SIGNALED ? "Signalled" : "");
-
-               rc = ib_post_send(ssk->qp, &wr, &bad_wr);
-               if (unlikely(rc)) {
-                       sdp_warn(sk, "ib_post_send failed with status %d.\n",
-                                       rc);
-                       sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-                       wake_up(&ssk->wq);
-                       break;
-               }
+       sdp_prf(sk, skb, "Finished posting(rc=%d), now to wait", rc);
 
-               copied += sge_bytes(sge, sge_cnt);
-               sge_left -= sge_cnt;
-               sge += sge_cnt;
+       got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
 
-               if (unlikely(ssk->srcavail_cancel_mseq > rx_sa->mseq)) {
-                       sdp_warn(sk, "got SrcAvailCancel - Aborting RDMA\n");
-                       rc = -EAGAIN;
-               }
-       } while (!rc && sge_left > 0);
+       sdp_arm_tx_cq(sk);
 
-       if (!rc || rc == -EAGAIN) {
-               int got_srcavail_cancel = (rc == -EAGAIN);
+       sdp_wait_rdma_wr_finished(ssk);
 
-               sdp_arm_tx_cq(sk);
+       sdp_prf(sk, skb, "Finished waiting(rc=%d)", rc);
+       if (!ssk->qp_active) {
+               sdp_warn(sk, "QP destroyed during RDMA read\n");
+               rc = -EPIPE;
+               goto err_post_send;
+       }
 
-               rc = sdp_wait_rdma_wr_finished(ssk, &timeo);
+       /* Ignore any data copied after getting SrcAvailCancel */
+       if (!got_srcavail_cancel) {
+               int copied = rx_sa->umem->length;
 
-               /* Ignore any data copied after getting SrcAvailCancel */
-               if (!got_srcavail_cancel && !rc) {
-                       sdp_update_iov_used(sk, iov, copied);
-                       rx_sa->used += copied;
-                       atomic_add(copied, &ssk->rcv_nxt);
-               }
+               sdp_update_iov_used(sk, iov, copied);
+               rx_sa->used += copied;
+               atomic_add(copied, &ssk->rcv_nxt);
        }
 
-       if (ssk->tx_ring.rdma_inflight->busy) {
-               sdp_warn(sk, "Aborted waiting for RDMA completion before finished!"
-                       "Memory might be corrupted.\n");
-               WARN_ON(1);
-       }
+       ssk->tx_ring.rdma_inflight = NULL;
+
+err_post_send:
+       sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
 
+err_alloc_fmr:
        if (rc && ssk->qp_active) {
-               /* post rdma, wait_for_compl or post rdma_rd_comp failed - 
-                * post sendsm */
-               sdp_warn(sk, "post rdma, wait_for_compl "
-                       "or post rdma_rd_comp failed - post sendsm\n");
+               sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
                rx_sa->flags |= RX_SA_ABORTED;
-               ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
-                                     the dirty logic from recvmsg */
        }
 
-       ssk->tx_ring.rdma_inflight = NULL;
-
-       sdp_unmap_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, offset, len);
-err_map_dma:   
-       sdp_put_pages(sk, rx_sa->pages, rx_sa->page_cnt);
-err_get_pages:
-       kfree(rx_sa->pages);
-       rx_sa->pages = NULL;
-       rx_sa->sge = NULL;
-err_alloc_zcopy:       
+       sock_put(sk, SOCK_REF_RDMA_RD);
 
        return rc;
 }
@@ -792,86 +601,74 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
        return ret;
 }
 
-static int sdp_rdma_adv_single(struct sock *sk,
-               struct tx_srcavail_state *tx_sa, struct iovec *iov, 
-               int page_cnt, int offset, int len)
+static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa,
+               struct iovec *iov, long *timeo)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
-       long timeo = SDP_SRCAVAIL_ADV_TIMEOUT;
-       unsigned long lock_flags;
        int rc = 0;
+       unsigned long lock_flags;
 
-       sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n",
-               offset, len, page_cnt);
+       rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
+                       &tx_sa->fmr, &tx_sa->umem);
+       if (rc) {
+               sdp_warn(sk, "Error allocating fmr: %d\n", rc);
+               goto err_alloc_fmr;
+       }
 
        if (tx_slots_free(ssk) == 0) {
-               rc = wait_for_sndbuf(sk, &timeo);
+               rc = wait_for_sndbuf(sk, timeo);
                if (rc) {
                        sdp_warn(sk, "Couldn't get send buffer\n");
-                       return rc;
+                       goto err_no_tx_slots;
                }
        }
 
-       tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[0]);
-       if (!tx_sa->fmr) {
-               sdp_warn(sk, "Error allocating fmr\n");
-               return -ENOMEM;
-       }
-
-       rc = sdp_post_srcavail(sk, tx_sa, 0, offset, len);
+       rc = sdp_post_srcavail(sk, tx_sa);
        if (rc) {
                sdp_dbg(sk, "Error posting SrcAvail\n");
                goto err_abort_send;
        }
 
-       rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
+       rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
        if (unlikely(rc)) {
                enum tx_sa_flag f = tx_sa->abort_flags;
 
                if (f & TX_SA_SENDSM) {
-                       sdp_dbg_data(sk, "got SendSM. use SEND verb.\n");
+                       sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
                } else if (f & TX_SA_ERROR) {
                        sdp_dbg_data(sk, "SrcAvail error completion\n");
                        sdp_reset(sk);
                } else if (ssk->qp_active) {
-                       if (f & TX_SA_INTRRUPTED)
-                               sdp_dbg_data(sk, "SrcAvail error completion\n");
-                       else 
-                               sdp_warn(sk, "SrcAvail send aborted flag = 0x%x.\n", f);
+                       if (!(f & TX_SA_INTRRUPTED))
+                               sdp_warn(sk, "abort_flag = 0x%x.\n", f);
 
                        sdp_post_srcavail_cancel(sk);
 
                        /* Wait for RdmaRdCompl/SendSM to
                         * finish the transaction */
-                       timeo = 2 * HZ;
+                       *timeo = 2 * HZ;
                        sdp_dbg_data(sk, "Waiting for SendSM\n");
-                       sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
+                       sdp_wait_rdmardcompl(ssk, timeo, 1);
                        sdp_dbg_data(sk, "finished waiting\n");
                } else {
                        sdp_warn(sk, "QP was destroyed while waiting\n");
                }
-
-               goto err_abort_send;
+       } else {
+               sdp_dbg_data(sk, "got RdmaRdCompl\n");
        }
-       sdp_prf1(sk, NULL, "got RdmaRdCompl");
-
-err_abort_send:
-       sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
-
-       ib_fmr_pool_unmap(tx_sa->fmr);
 
        spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
        ssk->tx_sa = NULL;
        spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
 
-       return rc;
-}
+err_abort_send:
+       sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
 
-static inline size_t get_page_count(unsigned long uaddr, size_t count)
-{
-       unsigned long end = (uaddr + count + PAGE_SIZE - 1) >> PAGE_SHIFT;
-       unsigned long start = uaddr >> PAGE_SHIFT;
-       return end - start;
+err_no_tx_slots:
+       sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
+
+err_alloc_fmr:
+       return rc;      
 }
 
 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -884,10 +681,9 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        long timeo;
        struct tx_srcavail_state *tx_sa;
        int offset;
+       size_t bytes_to_copy = 0;
        int copied = 0;
 
-       int page_cnt;
-
        sdp_dbg_data(sk, "%s\n", __func__);
        sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
        if (ssk->rx_sa) {
@@ -903,6 +699,10 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        posts_handler_get(ssk);
 
        flags = msg->msg_flags;
+
+       iovlen = msg->msg_iovlen;
+       iov = msg->msg_iov;
+
        timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
 
        /* Wait for a connection to finish. */
@@ -914,8 +714,6 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
 
        /* Ok commence sending. */
-       iovlen = msg->msg_iovlen;
-       iov = msg->msg_iov;
        offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
        sdp_dbg_data(sk, "Sending iov: %p, iovlen: 0x%lx, size: 0x%lx\n",
                        iov->iov_base, iov->iov_len, size);
@@ -933,59 +731,71 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                goto err;
        }
 
-       page_cnt = min(get_page_count((unsigned long)iov->iov_base,
-                               iov->iov_len), SDP_FMR_SIZE);
-
-       tx_sa = sdp_alloc_tx_sa(sk, page_cnt);
-       if (IS_ERR(tx_sa)) {
+       tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
+       if (!tx_sa) {
                sdp_warn(sk, "Error allocating zcopy context\n");
                rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
                goto err_alloc_tx_sa;
        }
 
+       bytes_to_copy = iov->iov_len;
        do {
-               size_t off = (unsigned long)iov->iov_base & ~PAGE_MASK;
-               size_t len = page_cnt * PAGE_SIZE - off;
-               if (len > iov->iov_len)
-                       len = iov->iov_len;
-
-               tx_sa->page_cnt = page_cnt;
-
-               rc = sdp_get_pages(sk, tx_sa->pages, page_cnt,
-                               (unsigned long)iov->iov_base);
-               if (rc)
-                       goto err_get_pages;
-
-               rc = sdp_map_dma(sk, tx_sa->addrs,
-                               tx_sa->pages, page_cnt,
-                               off, len);
-               if (rc)
-                       goto err_map_dma;
-
-               rc = sdp_rdma_adv_single(sk, tx_sa, iov, page_cnt, off, len);
-               if (rc)
-                       sdp_dbg_data(sk, "Error sending SrcAvail. rc = %d\n", rc);
-
-
-               if (tx_sa->addrs)
-                       sdp_unmap_dma(sk, tx_sa->addrs, page_cnt);
-err_map_dma:   
-               sdp_put_pages(sk, tx_sa->pages, page_cnt);
-err_get_pages:
-               page_cnt = min(get_page_count((unsigned long)iov->iov_base,
-                                       iov->iov_len), SDP_FMR_SIZE);
-               copied += tx_sa->bytes_acked;
                tx_sa_reset(tx_sa);
+
+               rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
+
+               if (iov->iov_len < sdp_zcopy_thresh) {
+                       sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
+                               iov->iov_len);
+                       break;
+               }
        } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
+
        kfree(tx_sa);
 err_alloc_tx_sa:
 err:
+       copied = bytes_to_copy - iov->iov_len;
 
        sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
        posts_handler_put(ssk);
+
        release_sock(sk);
+
        sock_put(&ssk->isk.sk, SOCK_REF_ZCOPY);
 
        return rc ?: copied;
 }
 
+void sdp_abort_srcavail(struct sock *sk)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       struct tx_srcavail_state *tx_sa = ssk->tx_sa;
+       unsigned long flags;
+
+       if (!tx_sa)
+               return;
+
+       cancel_delayed_work(&ssk->srcavail_cancel_work);
+       flush_scheduled_work();
+
+       spin_lock_irqsave(&ssk->tx_sa_lock, flags);
+
+       sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
+
+       ssk->tx_sa = NULL;
+
+       spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
+}
+
+void sdp_abort_rdma_read(struct sock *sk)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       struct rx_srcavail_state *rx_sa = ssk->rx_sa;
+
+       if (!rx_sa)
+               return;
+
+       sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
+
+       ssk->rx_sa = NULL;
+}