]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: More code cleanup and ZCopy bugs fixes
authorAmir Vadai <amirv@mellanox.co.il>
Thu, 20 Aug 2009 11:20:57 +0000 (14:20 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:38 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_dbg.h
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index 595fb7c42272a4866cff5b4d6f3063b703c93cf2..72c7620186b5d905b4696a23b0053fde4905ab80 100644 (file)
@@ -20,8 +20,6 @@
 #define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
 #define SDP_SRCAVAIL_PAYLOAD_LEN 1
 
-#define MAX_ZCOPY_SEND_SIZE (512 * 1024)
-
 #define SDP_RESOLVE_TIMEOUT 1000
 #define SDP_ROUTE_TIMEOUT 1000
 #define SDP_RETRY_COUNT 5
@@ -31,7 +29,7 @@
 #define SDP_TX_SIZE 0x40
 #define SDP_RX_SIZE 0x40
 
-#define SDP_FMR_SIZE           256
+#define SDP_FMR_SIZE (PAGE_SIZE / sizeof(u64 *))
 #define SDP_FMR_POOL_SIZE      1024
 #define SDP_FMR_DIRTY_SIZE     ( SDP_FMR_POOL_SIZE / 4 )
 
@@ -589,7 +587,7 @@ static inline struct sk_buff *sdp_alloc_skb(struct sock *sk, u8 mid, int size)
        else
                gfp = GFP_KERNEL;
 
-       skb = sk_stream_alloc_skb(sk, sizeof(struct sdp_bsdh) + size, gfp);
+       skb = sdp_stream_alloc_skb(sk, sizeof(struct sdp_bsdh) + size, gfp);
        BUG_ON(!skb);
 
         skb_header_release(skb);
index 432ec6caff62055ca14ccb953c748b8815c79716..6c1ac11452a93577670215eab4fbeee52e29b1bd 100644 (file)
@@ -115,9 +115,10 @@ static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
 
 static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
 {
+       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
        int send_now =
                BZCOPY_STATE(skb) ||
-               TX_SRCAVAIL_STATE(skb) ||
+               unlikely(h->mid != SDP_MID_DATA) ||
                (ssk->nonagle & TCP_NAGLE_OFF) ||
                !ssk->nagle_last_unacked ||
                skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
index d38ad29bafb3d4c51ccea2e3e082fcc53cf4e6d6..3229436ad1ed668db482a3d0ca6e4e892c97311c 100644 (file)
@@ -3,6 +3,7 @@
 
 #define SDPSTATS_ON
 #define SDP_PROFILING
+//#define GETNSTIMEODAY_SUPPORTED
 
 #define _sdp_printk(func, line, level, sk, format, arg...) do {               \
        preempt_disable(); \
@@ -50,6 +51,17 @@ struct sdpprf_log {
 extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
 extern int sdpprf_log_count;
 
+#ifdef GETNSTIMEODAY_SUPPORTED
+static inline unsigned long long current_nsec(void)
+{
+       struct timespec tv;
+       getnstimeofday(&tv);
+       return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
+}
+#else
+#define current_nsec() jiffies_to_usecs(jiffies)
+#endif
+
 #define sdp_prf1(sk, s, format, arg...) ({ \
        struct sdpprf_log *l = \
                &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
@@ -61,7 +73,7 @@ extern int sdpprf_log_count;
        l->cpu = smp_processor_id(); \
        l->skb = s; \
        snprintf(l->msg, sizeof(l->msg) - 1, format, ## arg); \
-       l->time = jiffies_to_usecs(jiffies); \
+       l->time = current_nsec(); \
        l->func = __func__; \
        l->line = __LINE__; \
        preempt_enable(); \
index 4292303cc0bb407915421ea9076de342c53d2d4a..b14e2c7c145b4565c66563fdf2e85431e66aa6a5 100644 (file)
@@ -758,6 +758,11 @@ void sdp_do_posts(struct sdp_sock *ssk)
        int xmit_poll_force;
        struct sk_buff *skb;
 
+       if (!ssk->qp_active) {
+               sdp_dbg(sk, "QP is deactivated\n");
+               return;
+       }
+
        while ((skb = skb_dequeue(&ssk->rx_ctl_q)))
                sdp_process_rx_ctl_skb(ssk, skb);
 
index f2dd17c43c786b594b52c0416fcd0f1a3649f316..3ce1d79581ef85bf2b2cad149c35f38217b3dae5 100644 (file)
@@ -50,10 +50,8 @@ static struct bzcopy_state dummy_bz = {
 busy: 1,
 };
 
-static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len);
-
 static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
-               int page_idx, struct iovec *iov, int off, size_t len)
+               int page_idx, int off, size_t len)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        struct sk_buff *skb;
@@ -76,25 +74,17 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
                                         * but continue to live after skb is freed */
        ssk->tx_sa = tx_sa;
 
-       if (0) {
-               void *payload;
-               payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN);
-               memcpy(payload, iov->iov_base, SDP_SRCAVAIL_PAYLOAD_LEN);
-               payload_len = SDP_SRCAVAIL_PAYLOAD_LEN;
-       } else {
-               /* must have payload inlined in SrcAvail packet in combined mode */
-               payload_len = min(len, PAGE_SIZE - off);
-               get_page(tx_sa->pages[page_idx]);
-               skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
-                               tx_sa->pages[page_idx], off, payload_len);
-
-               skb->len             += payload_len;
-               skb->data_len         = payload_len;
-               skb->truesize        += payload_len;
-//             sk->sk_wmem_queued   += payload_len;
-//             sk->sk_forward_alloc -= payload_len;
+       /* must have payload inlined in SrcAvail packet in combined mode */
+       payload_len = min(len, PAGE_SIZE - off);
+       get_page(tx_sa->pages[page_idx]);
+       skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
+                       tx_sa->pages[page_idx], off, payload_len);
 
-       }
+       skb->len             += payload_len;
+       skb->data_len         = payload_len;
+       skb->truesize        += payload_len;
+//     sk->sk_wmem_queued   += payload_len;
+//     sk->sk_forward_alloc -= payload_len;
 
        skb_entail(sk, ssk, skb);
        
@@ -165,9 +155,6 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
                        break;
                }
 
-               if (tx_sa->bytes_acked == tx_sa->bytes_sent)
-                       break;
-
                else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
                        err = -EINVAL;
                        sdp_warn(sk, "acked bytes > sent bytes\n");
@@ -208,6 +195,9 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
 
                posts_handler_get(ssk);
 
+               if (tx_sa->bytes_acked == tx_sa->bytes_sent)
+                       break;
+
                if (vm_wait) {
                        vm_wait -= current_timeo;
                        current_timeo = *timeo_p;
@@ -246,6 +236,12 @@ static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
        while (1) {
                prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
 
+               if (unlikely(!ssk->qp_active)) {
+                       err = -EPIPE;
+                       sdp_warn(sk, "socket closed\n");
+                       break;
+               }
+
                if (unlikely(!*timeo_p)) {
                        err = -EAGAIN;
                        sdp_warn(sk, "timedout\n");
@@ -406,12 +402,45 @@ out:
        return;
 }
 
+static int sdp_get_user_pages(struct page **pages, const unsigned int nr_pages,
+                             unsigned long uaddr, int rw)
+{
+       int res, i;
+
+        /* Try to fault in all of the necessary pages */
+       down_read(&current->mm->mmap_sem);
+        /* rw==READ means read from drive, write into memory area */
+       res = get_user_pages(
+               current,
+               current->mm,
+               uaddr,
+               nr_pages,
+               rw == READ,
+               0, /* don't force */
+               pages,
+               NULL);
+       up_read(&current->mm->mmap_sem);
+
+       /* Errors and no page mapped should return here */
+       if (res < nr_pages)
+               return res;
+
+        for (i=0; i < nr_pages; i++) {
+                /* FIXME: flush superflous for rw==READ,
+                 * probably wrong function for rw==WRITE
+                 */
+               flush_dcache_page(pages[i]);
+        }
+
+       return nr_pages;
+}
+
 int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
                unsigned long addr)
 {
        int done_pages = 0;
 
-       sdp_dbg_data(sk, "count: %d addr: 0x%lx\n", page_cnt, addr);
+       sdp_dbg_data(sk, "count: 0x%x addr: 0x%lx\n", page_cnt, addr);
 
        addr &= PAGE_MASK;
        if (segment_eq(get_fs(), KERNEL_DS)) {
@@ -423,12 +452,7 @@ int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
                        addr += PAGE_SIZE;
                }
        } else {
-               if (current->mm) {
-                       down_write(&current->mm->mmap_sem);
-                       done_pages = get_user_pages(current, current->mm, addr,
-                                       page_cnt, 0, 0, pages, NULL);
-                       up_write(&current->mm->mmap_sem);
-               }
+               done_pages = sdp_get_user_pages(pages, page_cnt, addr, WRITE);
        }
 
        if (unlikely(done_pages != page_cnt))
@@ -440,7 +464,7 @@ err:
        sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n",
                        done_pages, page_cnt);
        for (; done_pages > 0; done_pages--)
-               put_page(pages[done_pages - 1]);
+               page_cache_release(pages[done_pages - 1]);
 
        return -1;
 }
@@ -456,38 +480,34 @@ void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt)
        }
 }
 
-static int sdp_map_dma(struct sock *sk, u64 *addrs, int page_cnt,
-               struct page **pages, int offset, int len)
+static int sdp_map_dma(struct sock *sk, u64 *addrs, struct page **pages,
+               int nr_pages, size_t offset, size_t count)
 {
        struct ib_device *dev = sdp_sk(sk)->ib_device;
-       int i;
-       sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len);
-
-       for (i = 0; i < page_cnt; i++) {
-               int o = 0;
-               int l = PAGE_SIZE;
-
-               if (i == page_cnt - 1) {
-                       /* Last page */
-                       l = (len + offset) & (PAGE_SIZE - 1);
-                       if (l == 0)
-                               l = PAGE_SIZE;
+       int i = 0;
+       sdp_dbg_data(sk, "map dma offset: 0x%lx count: 0x%lx\n", offset, count);
+
+#define map_page(p, o, l) ({\
+       u64 addr = ib_dma_map_page(dev, p, o, l, DMA_TO_DEVICE); \
+       if (ib_dma_mapping_error(dev, addr)) { \
+               sdp_warn(sk, "Error mapping page %p off: 0x%lx len: 0x%lx\n", \
+                               p, o, l); \
+               goto err; \
+       } \
+       addr; \
+})
+
+       if (nr_pages > 1) {
+               size_t length = PAGE_SIZE - offset;
+               addrs[0] = map_page(pages[0], offset, length);
+               count -= length;
+               for (i=1; i < nr_pages ; i++) {
+                       length = count < PAGE_SIZE ? count : PAGE_SIZE;
+                       addrs[i] = map_page(pages[i], 0UL, length);
+                       count -= PAGE_SIZE;
                }
-
-//             sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d\n",
-//                             i, pages[i], o, l);
-
-               addrs[i] = ib_dma_map_page(dev,
-                               pages[i],
-                               o,
-                               l,
-                               DMA_TO_DEVICE);
-               if (ib_dma_mapping_error(dev, addrs[i])) {
-                       sdp_warn(sk, "Error mapping page %p off: %d len: %d\n",
-                                       pages[i], o, l);
-                       goto err;
-               }
-       }
+       } else
+               addrs[0] = map_page(pages[0], offset, count);
 
        return 0;
 err:
@@ -504,9 +524,8 @@ static void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt)
 
        sdp_dbg_data(sk, "count: %d\n", page_cnt);
 
-       for (i = 0; i < page_cnt; i++) {
-               ib_dma_unmap_page(dev, addrs[i], 4096, DMA_TO_DEVICE);
-       }
+       for (i = 0; i < page_cnt; i++)
+               ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE);
 }
 
 static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt,
@@ -629,14 +648,15 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
        int offset;
 
        sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read."
-               " len: %d. buffer len: %ld\n", len, iov->iov_len);
+               " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
        if (len > rx_sa->len)
                len = rx_sa->len;
 
        offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
 
        rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
-       sdp_dbg_data(sk, "page_cnt = %d len:%d offset: %d\n", rx_sa->page_cnt, len, offset);
+       sdp_dbg_data(sk, "page_cnt = 0x%x len:0x%x offset: 0x%x\n",
+                       rx_sa->page_cnt, len, offset);
 
        rx_sa->pages = 
                (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt +
@@ -680,15 +700,17 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
                rx_sa->busy++;
 
                sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx "
-                       "copied: 0x%x rkey: 0x%x in_bytes: %d\n",
+                       "copied: 0x%x rkey: 0x%x in_bytes: 0x%x\n",
                        sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey,
                        sge_bytes(sge, sge_cnt));
-               sdp_prf1(sk, NULL, "TX: RDMA read");
-
                if (sge_left == sge_cnt) {
                        wr.send_flags = IB_SEND_SIGNALED;
                        sdp_dbg_data(sk, "last wr is signaled\n");
                }
+               sdp_prf1(sk, NULL, "TX: RDMA read 0x%x bytes %s",
+                       sge_bytes(sge, sge_cnt),
+                       wr.send_flags & IB_SEND_SIGNALED ? "Signalled" : "");
+
                rc = ib_post_send(ssk->qp, &wr, &bad_wr);
                if (unlikely(rc)) {
                        sdp_warn(sk, "ib_post_send failed with status %d.\n",
@@ -723,7 +745,7 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
                }
        }
 
-       if (rc) {
+       if (rc && ssk->qp_active) {
                /* post rdma, wait_for_compl or post rdma_rd_comp failed - 
                 * post sendsm */
                sdp_warn(sk, "post rdma, wait_for_compl "
@@ -769,13 +791,16 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
 
 static int sdp_rdma_adv_single(struct sock *sk,
                struct tx_srcavail_state *tx_sa, struct iovec *iov, 
-               int p_idx, int page_cnt, int offset, int len)
+               int page_cnt, int offset, int len)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        long timeo = SDP_SRCAVAIL_ADV_TIMEOUT;
        unsigned long lock_flags;
        int rc = 0;
 
+       sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n",
+               offset, len, page_cnt);
+
        if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
                rc = wait_for_sndbuf(sk, &timeo);
                if (rc) {
@@ -784,14 +809,17 @@ static int sdp_rdma_adv_single(struct sock *sk,
                }
        }
 
-       tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[p_idx]);
+       tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[0]);
        if (!tx_sa->fmr) {
+               sdp_warn(sk, "Error allocating fmr\n");
                return -ENOMEM;
        }
 
-       rc = sdp_post_srcavail(sk, tx_sa, p_idx, iov, offset, len);
-       if (rc)
+       rc = sdp_post_srcavail(sk, tx_sa, 0, offset, len);
+       if (rc) {
+               sdp_warn(sk, "Error posting SrcAvail\n");
                goto err_abort_send;
+       }
 
        rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
        if (unlikely(rc)) {
@@ -803,16 +831,19 @@ static int sdp_rdma_adv_single(struct sock *sk,
                        sdp_warn(sk, "SrcAvail error completion\n");
                        sdp_reset(sk);
                } else if (ssk->qp_active) {
-                       sdp_warn(sk, "Aborting send. abort_flag = 0x%x.\n", f);
+                       if (f & TX_SA_INTRRUPTED)
+                               sdp_dbg_data(sk, "SrcAvail error completion\n");
+                       else 
+                               sdp_warn(sk, "abort_flag = 0x%x.\n", f);
 
                        sdp_post_srcavail_cancel(sk);
 
                        /* Wait for RdmaRdCompl/SendSM to
                         * finish the transaction */
                        timeo = 2 * HZ;
-                       sdp_warn(sk, "Waiting for SendSM\n");
+                       sdp_dbg_data(sk, "Waiting for SendSM\n");
                        sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
-                       sdp_warn(sk, "finished waiting\n");
+                       sdp_dbg_data(sk, "finished waiting\n");
                } else {
                        sdp_warn(sk, "QP was destroyed while waiting\n");
                }
@@ -833,6 +864,13 @@ err_abort_send:
        return rc;
 }
 
+static inline size_t get_page_count(unsigned long uaddr, size_t count)
+{
+       unsigned long end = (uaddr + count + PAGE_SIZE - 1) >> PAGE_SHIFT;
+       unsigned long start = uaddr >> PAGE_SHIFT;
+       return end - start;
+}
+
 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                size_t size)
 {
@@ -846,11 +884,9 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        int copied = 0;
 
        int page_cnt;
-       int bytes_left;
-       int pages_left;
-       int p_idx;
 
        sdp_dbg_data(sk, "%s\n", __func__);
+       sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
        if (ssk->rx_sa) {
                sdp_warn(sk, "Deadlock prevent: crossing SrcAvail\n");
                return -EAGAIN;
@@ -877,7 +913,7 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        iovlen = msg->msg_iovlen;
        iov = msg->msg_iov;
        offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
-       sdp_dbg_data(sk, "Sending iov: %p, iovlen: %ld, size: %ld\n",
+       sdp_dbg_data(sk, "Sending iov: %p, iovlen: 0x%lx, size: 0x%lx\n",
                        iov->iov_base, iov->iov_len, size);
 
        SDPSTATS_HIST(sendmsg_seglen, iov->iov_len);
@@ -893,8 +929,9 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                goto err;
        }
 
-       page_cnt = PAGE_ALIGN(min_t(int, iov->iov_len, MAX_ZCOPY_SEND_SIZE) + offset) 
-                       >> PAGE_SHIFT;
+       page_cnt = min(get_page_count((unsigned long)iov->iov_base,
+                               iov->iov_len), SDP_FMR_SIZE);
+
        tx_sa = sdp_alloc_tx_sa(sk, page_cnt);
        if (IS_ERR(tx_sa)) {
                sdp_warn(sk, "Error allocating zcopy context\n");
@@ -903,50 +940,43 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        }
 
        do {
+               size_t off = (unsigned long)iov->iov_base & ~PAGE_MASK;
+               size_t len = page_cnt * PAGE_SIZE - off;
+               if (len > iov->iov_len)
+                       len = iov->iov_len;
+
                tx_sa->page_cnt = page_cnt;
 
-               rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt,
+               rc = sdp_get_pages(sk, tx_sa->pages, page_cnt,
                                (unsigned long)iov->iov_base);
                if (rc)
                        goto err_get_pages;
 
-               rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages,
-                               offset, iov->iov_len);
+               rc = sdp_map_dma(sk, tx_sa->addrs,
+                               tx_sa->pages, page_cnt,
+                               off, len);
                if (rc)
                        goto err_map_dma;
 
-               bytes_left = iov->iov_len;
-               pages_left = tx_sa->page_cnt;
-               p_idx = 0;
-               do {
-                       int p_cnt = min(256, pages_left);
-                       int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
-
-                       sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
-                       rc = sdp_rdma_adv_single(sk, tx_sa, iov,
-                                       p_idx, p_cnt, offset, len);
-
-                       copied += len;
-                       bytes_left -= len;
-                       pages_left -= p_cnt;
-                       p_idx += p_cnt;
-                       offset = 0;
-               } while (!rc && !tx_sa->abort_flags && pages_left > 0);
-
-               sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
+               rc = sdp_rdma_adv_single(sk, tx_sa, iov, page_cnt, off, len);
+               if (rc)
+                       sdp_warn(sk, "Error sending SrcAvail. rc = %d\n", rc);
+
+
+               sdp_unmap_dma(sk, tx_sa->addrs, page_cnt);
 err_map_dma:   
-               sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt);
+               sdp_put_pages(sk, tx_sa->pages, page_cnt);
 err_get_pages:
-
-               page_cnt = PAGE_ALIGN(min_t(int, iov->iov_len, MAX_ZCOPY_SEND_SIZE)) 
-                       >> PAGE_SHIFT;
+               page_cnt = min(get_page_count((unsigned long)iov->iov_base,
+                                       iov->iov_len), SDP_FMR_SIZE);
+               copied += tx_sa->bytes_acked;
                tx_sa_reset(tx_sa);
-       } while (!rc && iov->iov_len > 0);
+       } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
        kfree(tx_sa);
 err_alloc_tx_sa:
 err:
 
-       sdp_prf1(sk, NULL, "Finshed RDMA rc: %d copied: %d", rc, copied);
+       sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
        posts_handler_put(ssk);
        release_sock(sk);