]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: Fix ZCopy compatability issues
authorAmir Vadai <amirv@mellanox.co.il>
Tue, 11 Aug 2009 10:57:48 +0000 (13:57 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:37 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_proc.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index f5a46ea208d095fb889c726278b8cf03a73297e3..94c51304bafb48af110914ffa1cf48333dfc28d5 100644 (file)
@@ -237,6 +237,7 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 
 #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
 #define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
+#define SDP_SRCAVAIL_PAYLOAD_LEN 1
 
 #define MAX_ZCOPY_SEND_SIZE (512 * 1024)
 
@@ -305,13 +306,14 @@ enum sdp_mid {
        SDP_MID_HELLO = 0x0,
        SDP_MID_HELLO_ACK = 0x1,
        SDP_MID_DISCONN = 0x2,
+       SDP_MID_ABORT = 0x3,
        SDP_MID_SENDSM = 0x4,
        SDP_MID_RDMARDCOMPL = 0x6,
        SDP_MID_SRCAVAIL_CANCEL = 0x8,
        SDP_MID_CHRCVBUF = 0xB,
        SDP_MID_CHRCVBUF_ACK = 0xC,
-       SDP_MID_SRCAVAIL = 0xFD,
-       SDP_MID_SINKAVAIL = 0xFE,
+       SDP_MID_SINKAVAIL = 0xFD,
+       SDP_MID_SRCAVAIL = 0xFE,
        SDP_MID_DATA = 0xFF,
 };
 
@@ -407,6 +409,7 @@ struct rx_srcavail_state {
        /* Advertised buffer stuff */
        u32 mseq;
        u32 used;
+       u32 reported;
        u32 len;
        u32 rkey;
        u64 vaddr;
@@ -430,8 +433,8 @@ struct tx_srcavail_state {
        u8              busy;
 
        struct ib_pool_fmr *fmr;
-       u32             bytes_completed;
-       u32             bytes_total;
+       u32             bytes_sent;
+       u32             bytes_acked;
 
        u8              abort;
        u32             mseq;
@@ -750,9 +753,6 @@ void sdp_proc_unregister(void);
 /* sdp_cma.c */
 int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
 
-/* sdp_bcopy.c */
-int sdp_post_credits(struct sdp_sock *ssk);
-
 /* sdp_tx.c */
 int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
 void sdp_tx_ring_destroy(struct sdp_sock *ssk);
@@ -785,7 +785,8 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb,
                int len);
 int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt,
                unsigned long addr);
-int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied);
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
+               struct rx_srcavail_state *rx_sa);
 int sdp_post_sendsm(struct sdp_sock *ssk);
 void srcavail_cancel_timeout(struct work_struct *work);
 
@@ -841,6 +842,7 @@ static inline char *mid2str(int mid)
        static char *mid2str[] = {
                ENUM2STR(SDP_MID_HELLO),
                ENUM2STR(SDP_MID_HELLO_ACK),
+               ENUM2STR(SDP_MID_ABORT),
                ENUM2STR(SDP_MID_DISCONN),
                ENUM2STR(SDP_MID_SENDSM),
                ENUM2STR(SDP_MID_RDMARDCOMPL),
index 34b4b8c62c505fc9e3efc3125adf6f4f32cea265..477b8a2eac389d27b67e5bc98b6ab4f01045a1d4 100644 (file)
@@ -60,7 +60,7 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
        case SDP_MID_HELLO:
                hh = (struct sdp_hh *)h;
                len += snprintf(buf + len, 255-len,
-                               "max_adverts: %d  majv_minv: %d "
+                               "max_adverts: %d  majv_minv: 0x%x "
                                "localrcvsz: %d desremrcvsz: %d |",
                                hh->max_adverts, hh->majv_minv,
                                ntohl(hh->localrcvsz),
@@ -90,12 +90,10 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
        case SDP_MID_SRCAVAIL:
                srcah = (struct sdp_srcah *)(h+1);
 
-               len += snprintf(buf + len, 255-len, " | data_len: %ld |",
+               len += snprintf(buf + len, 255-len, " | payload: %ld, "
+                               "len: %d, rkey: 0x%x, vaddr: 0x%llx |",
                                ntohl(h->len) - sizeof(struct sdp_bsdh) - 
-                               sizeof(struct sdp_srcah));
-
-               len += snprintf(buf + len, 255-len,
-                               " | len: %d, rkey: 0x%x, vaddr: 0x%llx |",
+                               sizeof(struct sdp_srcah),
                                ntohl(srcah->len), ntohl(srcah->rkey),
                                be64_to_cpu(srcah->vaddr));
                break;
index ab5ec58b4332d56a60c9f8c666570c139f4747f6..1f423cc78f88c26abaefb1d8e24092fb5e8dd8bd 100644 (file)
@@ -46,7 +46,7 @@
 #include "sdp_socket.h"
 #include "sdp.h"
 
-#define SDP_MAJV_MINV 0x22
+#define SDP_MAJV_MINV 0x11
 
 enum {
        SDP_HH_SIZE = 76,
@@ -330,11 +330,11 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                                rx_ring_posted(sdp_sk(sk)));
                memset(&hh, 0, sizeof hh);
                hh.bsdh.mid = SDP_MID_HELLO;
-               hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
                hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
                hh.max_adverts = 1;
                hh.majv_minv = SDP_MAJV_MINV;
                sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size);
+               hh.bsdh.bufs = htons(rx_ring_posted(sdp_sk(sk)));
                hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
                                PAGE_SIZE + sizeof(struct sdp_bsdh));
                hh.max_adverts = 0x1;
@@ -367,7 +367,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                                rx_ring_posted(sdp_sk(child)));
                memset(&hah, 0, sizeof hah);
                hah.bsdh.mid = SDP_MID_HELLO_ACK;
-               hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
+               hah.bsdh.bufs = htons(rx_ring_posted(sdp_sk(child)));
                hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
                hah.majv_minv = SDP_MAJV_MINV;
                hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
@@ -397,14 +397,8 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                if (rc) {
                        sdp_warn(sk, "Destroy qp !!!!\n");
                        rdma_reject(id, NULL, 0);
-               }
-               else
+               } else
                        rc = rdma_accept(id, NULL);
-
-               if (!rc) {
-//                     sdp_sk(sk)->qp_active = 1;
-                       rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0;
-               }
                break;
        case RDMA_CM_EVENT_CONNECT_ERROR:
                sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
index 577d5693a753cee197944521865d28b4dd63143c..61bdda7a0274c91643a1a9ae850974afb6aa2434 100644 (file)
@@ -1992,11 +1992,25 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                rx_sa = RX_SRCAVAIL_STATE(skb);
                                if (rx_sa->mseq < ssk->srcavail_cancel_mseq) {
                                        rx_sa->aborted = 1;
-                                       sdp_warn(sk, "Ignoring src avail - "
+                                       sdp_dbg_data(sk, "Ignoring src avail "
                                                "due to SrcAvailCancel\n");
                                        goto skb_cleanup;
                                }
-                               avail_bytes_count = rx_sa->len;
+
+                               /* if has payload - handle as if MID_DATA */
+                               if (rx_sa->used < skb->len) {
+                                       sdp_dbg_data(sk, "SrcAvail has "
+                                                       "payload: %d/%d\n",
+                                                        rx_sa->used,
+                                               skb->len);
+                                       avail_bytes_count = skb->len;
+                               } else {
+                                       sdp_dbg_data(sk, "Finished payload. "
+                                               "RDMAing: %d/%d\n",
+                                               rx_sa->used, rx_sa->len);
+                                       avail_bytes_count = rx_sa->len;
+                               }
+
                                break;
 
                        case SDP_MID_DATA:
@@ -2015,8 +2029,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        }
 
                        offset = *seq - SDP_SKB_CB(skb)->seq;
-//                     sdp_warn(sk, "offset = 0x%x, avail_byte_count = 0x%x\n",
-//                                     offset, avail_bytes_count);
                        if (offset < avail_bytes_count)
                                goto found_ok_skb;
 
@@ -2116,8 +2128,9 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        }
                }
                if (!(flags & MSG_TRUNC)) {
-                       if (rx_sa) {
-                               sdp_dbg_data(sk, "Got srcavail - using RDMA\n");
+                       if (rx_sa && rx_sa->used >= skb->len) {
+                               /* No more payload - start rdma copy */
+                               sdp_dbg_data(sk, "RDMA copy of %ld bytes\n", used);
                                err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb,
                                                used);
                                if (err == -EAGAIN) {
@@ -2127,8 +2140,14 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                }
                        } else {
                                err = skb_copy_datagram_iovec(skb, offset,
-                                                     /* TODO: skip header? */
-                                                     msg->msg_iov, used);
+                                               /* TODO: skip header? */
+                                               msg->msg_iov, used);
+                               if (rx_sa) {
+                                       rx_sa->used += used;
+                                       rx_sa->reported += used;
+                                       sdp_dbg_data(sk, "copied %ld from "
+                                                       "payload\n", used);
+                               }
                        }
                        if (err) {
                                sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
@@ -2155,7 +2174,7 @@ skip_copy:
 
                if (rx_sa) {
                        if (ssk->srcavail_cancel_mseq < rx_sa->mseq) {
-                               rc = sdp_post_rdma_rd_compl(ssk, rx_sa->used);
+                               rc = sdp_post_rdma_rd_compl(ssk, rx_sa);
                                BUG_ON(rc);
                        }
                        if (rx_sa->aborted) {
index 791030ffc21e54d6f467ba9c62bd3cfad78c9b64..9b00255871a8b445ae804e86b8ca44a2c9bd69ab 100644 (file)
@@ -378,8 +378,7 @@ static void sdpprf_stop(struct seq_file *p, void *v)
 {
 }
 
-
-const struct seq_operations sdpprf_ops = {
+struct seq_operations sdpprf_ops = {
        .start = sdpprf_start,
        .stop = sdpprf_stop,
        .next = sdpprf_next,
@@ -404,7 +403,7 @@ static ssize_t sdpprf_write(struct file *file, const char __user *buf,
        return count;
 }
 
-static const struct file_operations sdpprf_fops = {
+static struct file_operations sdpprf_fops = {
        .open           = sdpprf_open,
        .read           = seq_read,
        .llseek         = seq_lseek,
index 0476e4a62ea59da60e554728474ff7511d672a94..2453715f3969b6c19e4c939a2293ae01c927d580 100644 (file)
@@ -245,10 +245,8 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
        int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
        unsigned long max_bytes;
 
-       if (!ssk->qp_active) {
-//             sdp_warn(sk, "post rx while qp is not active\n");
+       if (!ssk->qp_active)
                return 0;
-       }
 
        if (top_mem_usage && (top_mem_usage * 0x100000) <
                        atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) {
@@ -321,6 +319,7 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
                sa->rkey = ntohl(srcah->rkey);
                sa->vaddr = be64_to_cpu(srcah->vaddr);
 
+               atomic_add(skb->len, &ssk->rcv_nxt);
                sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n",
                        skb_len, sa->vaddr);
        } else {
@@ -483,6 +482,12 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                ssk->srcavail_cancel_mseq = ntohl(h->mseq);
                sdp_post_sendsm(ssk);
                break;
+       case SDP_MID_ABORT:
+               sdp_dbg_data(sk, "Handling ABORT\n");
+               sdp_prf(sk, NULL, "Handling ABORT");
+               sdp_reset(sk);
+               __kfree_skb(skb);
+               break;
        case SDP_MID_DISCONN:
                sdp_dbg_data(sk, "Handling DISCONN\n");
                sdp_prf(sk, NULL, "Handling DISCONN");
@@ -544,6 +549,9 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                sk_send_sigurg(sk);*/
 
        skb_pull(skb, sizeof(struct sdp_bsdh));
+       if (h->mid == SDP_MID_SRCAVAIL) {
+               skb_pull(skb, sizeof(struct sdp_srcah));
+       }
 
        if (unlikely(h->mid == SDP_MID_DATA && skb->len == 0)) {
                /* Credit update is valid even after RCV_SHUTDOWN */
@@ -611,17 +619,24 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk,
                return NULL;
        }
        skb->len = wc->byte_len;
+       skb->data = skb->head;
+
+       h = (struct sdp_bsdh *)skb->data;
+
        if (likely(wc->byte_len > SDP_HEAD_SIZE))
                skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
        else
                skb->data_len = 0;
-       skb->data = skb->head;
+
+
+       if (h->mid == SDP_MID_SRCAVAIL)
+               skb->data_len -= sizeof(struct sdp_srcah);
+
 #ifdef NET_SKBUFF_DATA_USES_OFFSET
        skb->tail = skb_headlen(skb);
 #else
        skb->tail = skb->head + skb_headlen(skb);
 #endif
-       h = (struct sdp_bsdh *)skb->data;
        SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
        skb_reset_transport_header(skb);
 
index 185b85be51d690fcc40ca52a709eb50560eb5a05..80c8c8e8c7c76b30df00f0f4a1976d5e10291ef6 100644 (file)
@@ -50,21 +50,25 @@ static struct bzcopy_state dummy_bz = {
 busy: 1,
 };
 
+static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len);
+
 static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
-               int off, size_t len)
+               int page_idx, struct iovec *iov, int off, size_t len)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        struct sdp_srcah *srcah;
        struct sk_buff *skb;
+       int payload_len;
 
        WARN_ON(ssk->tx_sa);
 
        BUG_ON(!tx_sa);
        BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
 
-       skb = sdp_stream_alloc_skb(&ssk->isk.sk,
+       skb = sk_stream_alloc_skb(&ssk->isk.sk,
                        sizeof(struct sdp_bsdh) +
-                       sizeof(struct sdp_srcah),
+                       sizeof(struct sdp_srcah) + 
+                       SDP_SRCAVAIL_PAYLOAD_LEN,
                        GFP_KERNEL);
        if (!skb) {
                return -ENOMEM;
@@ -80,8 +84,31 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
        srcah->rkey = htonl(tx_sa->fmr->fmr->lkey);
        srcah->vaddr = cpu_to_be64(off);
 
+       if (0) {
+               void *payload;
+               payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN);
+               memcpy(payload, iov->iov_base, SDP_SRCAVAIL_PAYLOAD_LEN);
+               payload_len = SDP_SRCAVAIL_PAYLOAD_LEN;
+       } else {
+               /* must have payload inlined in SrcAvail packet in combined mode */
+               payload_len = min(len, PAGE_SIZE - off);
+               get_page(tx_sa->pages[page_idx]);
+               skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags,
+                               tx_sa->pages[page_idx], off, payload_len);
+
+               skb->len             += payload_len;
+               skb->data_len         = payload_len;
+               skb->truesize        += payload_len;
+//             sk->sk_wmem_queued   += payload_len;
+//             sk->sk_forward_alloc -= payload_len;
+
+       }
+
        skb_entail(sk, ssk, skb);
        
+       tx_sa->bytes_sent = len;
+       tx_sa->bytes_acked = payload_len;
+
        /* TODO: pushing the skb into the tx_queue should be enough */
 
        return 0;
@@ -147,8 +174,13 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
                        break;
                }
 
-               if (tx_sa->bytes_completed >= len)
+               if (tx_sa->bytes_acked == tx_sa->bytes_sent)
                        break;
+               else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
+                       err = -EINVAL;
+                       sdp_warn(sk, "acked bytes > sent bytes\n");
+                       break;
+               }
 
                if (tx_sa->abort) {
                        sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
@@ -158,8 +190,10 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
 
                posts_handler_put(ssk);
 
-               sk_wait_event(sk, &current_timeo, tx_sa->abort && 
-                               (tx_sa->bytes_completed >= len) && vm_wait);
+               sk_wait_event(sk, &current_timeo,
+                               tx_sa->abort && 
+                               (tx_sa->bytes_acked < tx_sa->bytes_sent) && 
+                               vm_wait);
                sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
 
                posts_handler_get(ssk);
@@ -181,8 +215,8 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
 
        finish_wait(sk->sk_sleep, &wait);
 
-       sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d bytes, abort: %d\n",
-                       tx_sa->bytes_completed, tx_sa->abort);
+       sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, abort: %d\n",
+                       tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort);
 
        if (!ssk->qp_active) {
                sdp_warn(sk, "QP destroyed while waiting\n");
@@ -240,11 +274,16 @@ static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p)
        return err;
 }
 
-int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied)
+int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
+               struct rx_srcavail_state *rx_sa)
 {
        struct sdp_rrch *rrch;
        struct sk_buff *skb;
        gfp_t gfp_page;
+       int copied = rx_sa->used - rx_sa->reported;
+
+       if (rx_sa->used <= rx_sa->reported)
+               return 0;
 
        if (unlikely(ssk->isk.sk.sk_allocation))
                gfp_page = ssk->isk.sk.sk_allocation;
@@ -260,6 +299,7 @@ int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied)
 
        rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch));
        rrch->len = htonl(copied);
+       rx_sa->reported += copied;
        /* TODO: What if no tx_credits available? */
        sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL);
 
@@ -371,18 +411,7 @@ void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
                goto out;
        }
 
-       if (ssk->tx_sa->bytes_completed > bytes_completed) {
-               sdp_warn(sk, "tx_sa->bytes_total(%d), "
-                       "tx_sa->bytes_completed(%d) > bytes_completed(%d)\n",
-                               ssk->tx_sa->bytes_total, 
-                               ssk->tx_sa->bytes_completed, bytes_completed);
-       }
-       if (ssk->tx_sa->bytes_total < bytes_completed) {
-               sdp_warn(sk, "ssk->tx_sa->bytes_total(%d) < bytes_completed(%d)\n",
-                               ssk->tx_sa->bytes_total, bytes_completed);
-       }
-
-       ssk->tx_sa->bytes_completed = bytes_completed;
+       ssk->tx_sa->bytes_acked += bytes_completed;
 
        wake_up(sk->sk_sleep);
        sdp_dbg_data(sk, "woke up sleepers\n");
@@ -753,7 +782,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
 
 static int sdp_rdma_adv_single(struct sock *sk,
                struct tx_srcavail_state *tx_sa, struct iovec *iov, 
-               int offset, int page_cnt, int len, u64 *addrs)
+               int p_idx, int page_cnt, int offset, int len)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        long timeo = SDP_SRCAVAIL_ADV_TIMEOUT;
@@ -768,17 +797,16 @@ static int sdp_rdma_adv_single(struct sock *sk,
                }
        }
 
-       tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs);
+       tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[p_idx]);
        if (!tx_sa->fmr) {
                return -ENOMEM;
        }
 
-       tx_sa->bytes_completed = 0;
-       tx_sa->bytes_total = len;
-       rc = sdp_post_srcavail(sk, tx_sa, offset, len);
+       rc = sdp_post_srcavail(sk, tx_sa, p_idx, iov, offset, len);
        if (rc)
                goto err_abort_send;
 
+
        rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
        if (unlikely(rc)) {
                switch (rc) {
@@ -811,7 +839,7 @@ static int sdp_rdma_adv_single(struct sock *sk,
        }
        sdp_prf1(sk, NULL, "got RdmaRdCompl");
 
-       sdp_update_iov_used(sk, iov, tx_sa->bytes_completed);
+       sdp_update_iov_used(sk, iov, tx_sa->bytes_sent);
 
 err_abort_send:
        ib_fmr_pool_unmap(tx_sa->fmr);
@@ -838,7 +866,7 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        int page_cnt;
        int bytes_left;
        int pages_left;
-       u64 *addrs;
+       int p_idx;
 
        sdp_dbg_data(sk, "%s\n", __func__);
 
@@ -903,19 +931,20 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
                bytes_left = iov->iov_len;
                pages_left = tx_sa->page_cnt;
-               addrs = tx_sa->addrs;
+               p_idx = 0;
                do {
                        int p_cnt = min(256, pages_left);
                        int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
 
                        sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
                        rc = sdp_rdma_adv_single(sk, tx_sa, iov,
-                                       offset, p_cnt, len, addrs);
+                                       p_idx, p_cnt, offset, len);
+//                                     offset, p_cnt, len, addrs);
 
                        copied += len;
                        bytes_left -= len;
                        pages_left -= p_cnt;
-                       addrs += p_cnt;
+                       p_idx += p_cnt;
                        offset = 0;
                } while (!rc && !tx_sa->abort && pages_left > 0);