From: Amir Vadai Date: Tue, 11 Aug 2009 10:57:48 +0000 (+0300) Subject: sdp: Fix ZCopy compatability issues X-Git-Tag: v4.1.12-92~264^2~5^2~254 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=c78c74ddcfed45fe50d4498ca2fc587d90b5ca5e;p=users%2Fjedix%2Flinux-maple.git sdp: Fix ZCopy compatability issues Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index f5a46ea208d09..94c51304bafb4 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -237,6 +237,7 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5) #define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ) +#define SDP_SRCAVAIL_PAYLOAD_LEN 1 #define MAX_ZCOPY_SEND_SIZE (512 * 1024) @@ -305,13 +306,14 @@ enum sdp_mid { SDP_MID_HELLO = 0x0, SDP_MID_HELLO_ACK = 0x1, SDP_MID_DISCONN = 0x2, + SDP_MID_ABORT = 0x3, SDP_MID_SENDSM = 0x4, SDP_MID_RDMARDCOMPL = 0x6, SDP_MID_SRCAVAIL_CANCEL = 0x8, SDP_MID_CHRCVBUF = 0xB, SDP_MID_CHRCVBUF_ACK = 0xC, - SDP_MID_SRCAVAIL = 0xFD, - SDP_MID_SINKAVAIL = 0xFE, + SDP_MID_SINKAVAIL = 0xFD, + SDP_MID_SRCAVAIL = 0xFE, SDP_MID_DATA = 0xFF, }; @@ -407,6 +409,7 @@ struct rx_srcavail_state { /* Advertised buffer stuff */ u32 mseq; u32 used; + u32 reported; u32 len; u32 rkey; u64 vaddr; @@ -430,8 +433,8 @@ struct tx_srcavail_state { u8 busy; struct ib_pool_fmr *fmr; - u32 bytes_completed; - u32 bytes_total; + u32 bytes_sent; + u32 bytes_acked; u8 abort; u32 mseq; @@ -750,9 +753,6 @@ void sdp_proc_unregister(void); /* sdp_cma.c */ int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *); -/* sdp_bcopy.c */ -int sdp_post_credits(struct sdp_sock *ssk); - /* sdp_tx.c */ int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device); void sdp_tx_ring_destroy(struct sdp_sock *ssk); @@ -785,7 +785,8 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, int len); int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, unsigned long addr); -int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied); +int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, + struct rx_srcavail_state *rx_sa); int sdp_post_sendsm(struct sdp_sock *ssk); void srcavail_cancel_timeout(struct work_struct *work); @@ -841,6 +842,7 @@ static inline char *mid2str(int mid) static char *mid2str[] = { ENUM2STR(SDP_MID_HELLO), ENUM2STR(SDP_MID_HELLO_ACK), + ENUM2STR(SDP_MID_ABORT), ENUM2STR(SDP_MID_DISCONN), ENUM2STR(SDP_MID_SENDSM), ENUM2STR(SDP_MID_RDMARDCOMPL), diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 34b4b8c62c505..477b8a2eac389 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -60,7 +60,7 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, case SDP_MID_HELLO: hh = (struct sdp_hh *)h; len += snprintf(buf + len, 255-len, - "max_adverts: %d majv_minv: %d " + "max_adverts: %d majv_minv: 0x%x " "localrcvsz: %d desremrcvsz: %d |", hh->max_adverts, hh->majv_minv, ntohl(hh->localrcvsz), @@ -90,12 +90,10 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, case SDP_MID_SRCAVAIL: srcah = (struct sdp_srcah *)(h+1); - len += snprintf(buf + len, 255-len, " | data_len: %ld |", + len += snprintf(buf + len, 255-len, " | payload: %ld, " + "len: %d, rkey: 0x%x, vaddr: 0x%llx |", ntohl(h->len) - sizeof(struct sdp_bsdh) - - sizeof(struct sdp_srcah)); - - len += snprintf(buf + len, 255-len, - " | len: %d, rkey: 0x%x, vaddr: 0x%llx |", + sizeof(struct sdp_srcah), ntohl(srcah->len), ntohl(srcah->rkey), be64_to_cpu(srcah->vaddr)); break; diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index ab5ec58b4332d..1f423cc78f88c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -46,7 +46,7 @@ #include "sdp_socket.h" #include "sdp.h" -#define SDP_MAJV_MINV 0x22 +#define SDP_MAJV_MINV 0x11 enum { SDP_HH_SIZE = 76, @@ -330,11 +330,11 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) rx_ring_posted(sdp_sk(sk))); memset(&hh, 0, sizeof hh); hh.bsdh.mid = SDP_MID_HELLO; - hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk))); hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE); hh.max_adverts = 1; hh.majv_minv = SDP_MAJV_MINV; sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size); + hh.bsdh.bufs = htons(rx_ring_posted(sdp_sk(sk))); hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags * PAGE_SIZE + sizeof(struct sdp_bsdh)); hh.max_adverts = 0x1; @@ -367,7 +367,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) rx_ring_posted(sdp_sk(child))); memset(&hah, 0, sizeof hah); hah.bsdh.mid = SDP_MID_HELLO_ACK; - hah.bsdh.bufs = htons(remote_credits(sdp_sk(child))); + hah.bsdh.bufs = htons(rx_ring_posted(sdp_sk(child))); hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE); hah.majv_minv = SDP_MAJV_MINV; hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec, @@ -397,14 +397,8 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event) if (rc) { sdp_warn(sk, "Destroy qp !!!!\n"); rdma_reject(id, NULL, 0); - } - else + } else rc = rdma_accept(id, NULL); - - if (!rc) { -// sdp_sk(sk)->qp_active = 1; - rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0; - } break; case RDMA_CM_EVENT_CONNECT_ERROR: sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n"); diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 577d5693a753c..61bdda7a0274c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -1992,11 +1992,25 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, rx_sa = RX_SRCAVAIL_STATE(skb); if (rx_sa->mseq < ssk->srcavail_cancel_mseq) { rx_sa->aborted = 1; - sdp_warn(sk, "Ignoring src avail - " + sdp_dbg_data(sk, "Ignoring src avail " "due to SrcAvailCancel\n"); goto skb_cleanup; } - avail_bytes_count = rx_sa->len; + + /* if has payload - handle as if MID_DATA */ + if (rx_sa->used < skb->len) { + sdp_dbg_data(sk, "SrcAvail has " + "payload: %d/%d\n", + rx_sa->used, + skb->len); + avail_bytes_count = skb->len; + } else { + sdp_dbg_data(sk, "Finished payload. " + "RDMAing: %d/%d\n", + rx_sa->used, rx_sa->len); + avail_bytes_count = rx_sa->len; + } + break; case SDP_MID_DATA: @@ -2015,8 +2029,6 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, } offset = *seq - SDP_SKB_CB(skb)->seq; -// sdp_warn(sk, "offset = 0x%x, avail_byte_count = 0x%x\n", -// offset, avail_bytes_count); if (offset < avail_bytes_count) goto found_ok_skb; @@ -2116,8 +2128,9 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, } } if (!(flags & MSG_TRUNC)) { - if (rx_sa) { - sdp_dbg_data(sk, "Got srcavail - using RDMA\n"); + if (rx_sa && rx_sa->used >= skb->len) { + /* No more payload - start rdma copy */ + sdp_dbg_data(sk, "RDMA copy of %ld bytes\n", used); err = sdp_rdma_to_iovec(sk, msg->msg_iov, skb, used); if (err == -EAGAIN) { @@ -2127,8 +2140,14 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, } } else { err = skb_copy_datagram_iovec(skb, offset, - /* TODO: skip header? */ - msg->msg_iov, used); + /* TODO: skip header? */ + msg->msg_iov, used); + if (rx_sa) { + rx_sa->used += used; + rx_sa->reported += used; + sdp_dbg_data(sk, "copied %ld from " + "payload\n", used); + } } if (err) { sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed" @@ -2155,7 +2174,7 @@ skip_copy: if (rx_sa) { if (ssk->srcavail_cancel_mseq < rx_sa->mseq) { - rc = sdp_post_rdma_rd_compl(ssk, rx_sa->used); + rc = sdp_post_rdma_rd_compl(ssk, rx_sa); BUG_ON(rc); } if (rx_sa->aborted) { diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c index 791030ffc21e5..9b00255871a8b 100644 --- a/drivers/infiniband/ulp/sdp/sdp_proc.c +++ b/drivers/infiniband/ulp/sdp/sdp_proc.c @@ -378,8 +378,7 @@ static void sdpprf_stop(struct seq_file *p, void *v) { } - -const struct seq_operations sdpprf_ops = { +struct seq_operations sdpprf_ops = { .start = sdpprf_start, .stop = sdpprf_stop, .next = sdpprf_next, @@ -404,7 +403,7 @@ static ssize_t sdpprf_write(struct file *file, const char __user *buf, return count; } -static const struct file_operations sdpprf_fops = { +static struct file_operations sdpprf_fops = { .open = sdpprf_open, .read = seq_read, .llseek = seq_lseek, diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 0476e4a62ea59..2453715f3969b 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -245,10 +245,8 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk) int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE; unsigned long max_bytes; - if (!ssk->qp_active) { -// sdp_warn(sk, "post rx while qp is not active\n"); + if (!ssk->qp_active) return 0; - } if (top_mem_usage && (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) { @@ -321,6 +319,7 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, sa->rkey = ntohl(srcah->rkey); sa->vaddr = be64_to_cpu(srcah->vaddr); + atomic_add(skb->len, &ssk->rcv_nxt); sdp_dbg_data(sk, "queueing SrcAvail. skb_len = %d vaddr = %lld\n", skb_len, sa->vaddr); } else { @@ -483,6 +482,12 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) ssk->srcavail_cancel_mseq = ntohl(h->mseq); sdp_post_sendsm(ssk); break; + case SDP_MID_ABORT: + sdp_dbg_data(sk, "Handling ABORT\n"); + sdp_prf(sk, NULL, "Handling ABORT"); + sdp_reset(sk); + __kfree_skb(skb); + break; case SDP_MID_DISCONN: sdp_dbg_data(sk, "Handling DISCONN\n"); sdp_prf(sk, NULL, "Handling DISCONN"); @@ -544,6 +549,9 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) sk_send_sigurg(sk);*/ skb_pull(skb, sizeof(struct sdp_bsdh)); + if (h->mid == SDP_MID_SRCAVAIL) { + skb_pull(skb, sizeof(struct sdp_srcah)); + } if (unlikely(h->mid == SDP_MID_DATA && skb->len == 0)) { /* Credit update is valid even after RCV_SHUTDOWN */ @@ -611,17 +619,24 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, return NULL; } skb->len = wc->byte_len; + skb->data = skb->head; + + h = (struct sdp_bsdh *)skb->data; + if (likely(wc->byte_len > SDP_HEAD_SIZE)) skb->data_len = wc->byte_len - SDP_HEAD_SIZE; else skb->data_len = 0; - skb->data = skb->head; + + + if (h->mid == SDP_MID_SRCAVAIL) + skb->data_len -= sizeof(struct sdp_srcah); + #ifdef NET_SKBUFF_DATA_USES_OFFSET skb->tail = skb_headlen(skb); #else skb->tail = skb->head + skb_headlen(skb); #endif - h = (struct sdp_bsdh *)skb->data; SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h); skb_reset_transport_header(skb); diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index 185b85be51d69..80c8c8e8c7c76 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -50,21 +50,25 @@ static struct bzcopy_state dummy_bz = { busy: 1, }; +static int sdp_update_iov_used(struct sock *sk, struct iovec *iov, int len); + static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, - int off, size_t len) + int page_idx, struct iovec *iov, int off, size_t len) { struct sdp_sock *ssk = sdp_sk(sk); struct sdp_srcah *srcah; struct sk_buff *skb; + int payload_len; WARN_ON(ssk->tx_sa); BUG_ON(!tx_sa); BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey); - skb = sdp_stream_alloc_skb(&ssk->isk.sk, + skb = sk_stream_alloc_skb(&ssk->isk.sk, sizeof(struct sdp_bsdh) + - sizeof(struct sdp_srcah), + sizeof(struct sdp_srcah) + + SDP_SRCAVAIL_PAYLOAD_LEN, GFP_KERNEL); if (!skb) { return -ENOMEM; @@ -80,8 +84,31 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, srcah->rkey = htonl(tx_sa->fmr->fmr->lkey); srcah->vaddr = cpu_to_be64(off); + if (0) { + void *payload; + payload = skb_put(skb, SDP_SRCAVAIL_PAYLOAD_LEN); + memcpy(payload, iov->iov_base, SDP_SRCAVAIL_PAYLOAD_LEN); + payload_len = SDP_SRCAVAIL_PAYLOAD_LEN; + } else { + /* must have payload inlined in SrcAvail packet in combined mode */ + payload_len = min(len, PAGE_SIZE - off); + get_page(tx_sa->pages[page_idx]); + skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, + tx_sa->pages[page_idx], off, payload_len); + + skb->len += payload_len; + skb->data_len = payload_len; + skb->truesize += payload_len; +// sk->sk_wmem_queued += payload_len; +// sk->sk_forward_alloc -= payload_len; + + } + skb_entail(sk, ssk, skb); + tx_sa->bytes_sent = len; + tx_sa->bytes_acked = payload_len; + /* TODO: pushing the skb into the tx_queue should be enough */ return 0; @@ -147,8 +174,13 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, break; } - if (tx_sa->bytes_completed >= len) + if (tx_sa->bytes_acked == tx_sa->bytes_sent) break; + else if (tx_sa->bytes_acked > tx_sa->bytes_sent) { + err = -EINVAL; + sdp_warn(sk, "acked bytes > sent bytes\n"); + break; + } if (tx_sa->abort) { sdp_prf1(sk, NULL, "Aborting SrcAvail sending"); @@ -158,8 +190,10 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, posts_handler_put(ssk); - sk_wait_event(sk, ¤t_timeo, tx_sa->abort && - (tx_sa->bytes_completed >= len) && vm_wait); + sk_wait_event(sk, ¤t_timeo, + tx_sa->abort && + (tx_sa->bytes_acked < tx_sa->bytes_sent) && + vm_wait); sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); posts_handler_get(ssk); @@ -181,8 +215,8 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, finish_wait(sk->sk_sleep, &wait); - sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d bytes, abort: %d\n", - tx_sa->bytes_completed, tx_sa->abort); + sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, abort: %d\n", + tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort); if (!ssk->qp_active) { sdp_warn(sk, "QP destroyed while waiting\n"); @@ -240,11 +274,16 @@ static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p) return err; } -int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied) +int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, + struct rx_srcavail_state *rx_sa) { struct sdp_rrch *rrch; struct sk_buff *skb; gfp_t gfp_page; + int copied = rx_sa->used - rx_sa->reported; + + if (rx_sa->used <= rx_sa->reported) + return 0; if (unlikely(ssk->isk.sk.sk_allocation)) gfp_page = ssk->isk.sk.sk_allocation; @@ -260,6 +299,7 @@ int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, int copied) rrch = (struct sdp_rrch *)skb_put(skb, sizeof(*rrch)); rrch->len = htonl(copied); + rx_sa->reported += copied; /* TODO: What if no tx_credits available? */ sdp_post_send(ssk, skb, SDP_MID_RDMARDCOMPL); @@ -371,18 +411,7 @@ void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, goto out; } - if (ssk->tx_sa->bytes_completed > bytes_completed) { - sdp_warn(sk, "tx_sa->bytes_total(%d), " - "tx_sa->bytes_completed(%d) > bytes_completed(%d)\n", - ssk->tx_sa->bytes_total, - ssk->tx_sa->bytes_completed, bytes_completed); - } - if (ssk->tx_sa->bytes_total < bytes_completed) { - sdp_warn(sk, "ssk->tx_sa->bytes_total(%d) < bytes_completed(%d)\n", - ssk->tx_sa->bytes_total, bytes_completed); - } - - ssk->tx_sa->bytes_completed = bytes_completed; + ssk->tx_sa->bytes_acked += bytes_completed; wake_up(sk->sk_sleep); sdp_dbg_data(sk, "woke up sleepers\n"); @@ -753,7 +782,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) static int sdp_rdma_adv_single(struct sock *sk, struct tx_srcavail_state *tx_sa, struct iovec *iov, - int offset, int page_cnt, int len, u64 *addrs) + int p_idx, int page_cnt, int offset, int len) { struct sdp_sock *ssk = sdp_sk(sk); long timeo = SDP_SRCAVAIL_ADV_TIMEOUT; @@ -768,17 +797,16 @@ static int sdp_rdma_adv_single(struct sock *sk, } } - tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs); + tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[p_idx]); if (!tx_sa->fmr) { return -ENOMEM; } - tx_sa->bytes_completed = 0; - tx_sa->bytes_total = len; - rc = sdp_post_srcavail(sk, tx_sa, offset, len); + rc = sdp_post_srcavail(sk, tx_sa, p_idx, iov, offset, len); if (rc) goto err_abort_send; + rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0); if (unlikely(rc)) { switch (rc) { @@ -811,7 +839,7 @@ static int sdp_rdma_adv_single(struct sock *sk, } sdp_prf1(sk, NULL, "got RdmaRdCompl"); - sdp_update_iov_used(sk, iov, tx_sa->bytes_completed); + sdp_update_iov_used(sk, iov, tx_sa->bytes_sent); err_abort_send: ib_fmr_pool_unmap(tx_sa->fmr); @@ -838,7 +866,7 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, int page_cnt; int bytes_left; int pages_left; - u64 *addrs; + int p_idx; sdp_dbg_data(sk, "%s\n", __func__); @@ -903,19 +931,20 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, bytes_left = iov->iov_len; pages_left = tx_sa->page_cnt; - addrs = tx_sa->addrs; + p_idx = 0; do { int p_cnt = min(256, pages_left); int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left); sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left); rc = sdp_rdma_adv_single(sk, tx_sa, iov, - offset, p_cnt, len, addrs); + p_idx, p_cnt, offset, len); +// offset, p_cnt, len, addrs); copied += len; bytes_left -= len; pages_left -= p_cnt; - addrs += p_cnt; + p_idx += p_cnt; offset = 0; } while (!rc && !tx_sa->abort && pages_left > 0);