From: Amir Vadai Date: Mon, 30 Nov 2009 15:45:13 +0000 (+0200) Subject: sdp: Cleanup ZCopy page registrations X-Git-Tag: v4.1.12-92~264^2~5^2~234 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=aa79b5b54d5291cf52aaae5ce97214ac52a90b71;p=users%2Fjedix%2Flinux-maple.git sdp: Cleanup ZCopy page registrations - use ib_umem_get instead of get_user_pages - Use FMR for RDMA reads - ZCopy sender side to use ib_umem - send remainder of ZCopy with BCopy Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index d67a973d1b55e..fe6fa1270cde6 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -33,6 +33,8 @@ #define SDP_FMR_POOL_SIZE 1024 #define SDP_FMR_DIRTY_SIZE ( SDP_FMR_POOL_SIZE / 4 ) +#define SDP_MAX_RDMA_READ_LEN (PAGE_SIZE * (SDP_FMR_SIZE - 2)) + #define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE) #define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1) @@ -88,6 +90,7 @@ struct sdp_skb_cb { sdp_do_posts(ssk); \ } while (0) +extern int sdp_zcopy_thresh; extern struct workqueue_struct *sdp_wq; extern struct list_head sock_list; extern spinlock_t sock_list_lock; @@ -225,9 +228,8 @@ struct rx_srcavail_state { u64 vaddr; /* Dest buff info */ - u32 page_cnt; - struct page **pages; - struct ib_sge *sge; + struct ib_umem *umem; + struct ib_pool_fmr *fmr; /* Utility */ u8 busy; @@ -235,14 +237,12 @@ struct rx_srcavail_state { }; struct tx_srcavail_state { - u32 page_cnt; - struct page **pages; - u64 *addrs; - /* Data below 'busy' will be reset */ u8 busy; + struct ib_umem *umem; struct ib_pool_fmr *fmr; + u32 bytes_sent; u32 bytes_acked; @@ -323,6 +323,8 @@ struct sdp_sock { struct delayed_work srcavail_cancel_work; int srcavail_cancel_mseq; + struct ib_ucontext context; + int max_sge; struct work_struct rx_comp_work; @@ -769,12 +771,11 @@ void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, int sdp_handle_rdma_read_cqe(struct sdp_sock *ssk); int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, int len); -int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, - unsigned long addr); int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa); int sdp_post_sendsm(struct sock *sk); void srcavail_cancel_timeout(struct work_struct *work); -void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt); +void sdp_abort_srcavail(struct sock *sk); +void sdp_abort_rdma_read(struct sock *sk); #endif diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index b3776218f8875..5ad5496381791 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -120,6 +120,7 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) sdp_sk(sk)->qp = id->qp; sdp_sk(sk)->ib_device = device; sdp_sk(sk)->qp_active = 1; + sdp_sk(sk)->context.device = device; init_waitqueue_head(&sdp_sk(sk)->wq); diff --git a/drivers/infiniband/ulp/sdp/sdp_dbg.h b/drivers/infiniband/ulp/sdp/sdp_dbg.h index 10ef4f0605527..c3f5bc7ae9df3 100644 --- a/drivers/infiniband/ulp/sdp/sdp_dbg.h +++ b/drivers/infiniband/ulp/sdp/sdp_dbg.h @@ -146,6 +146,7 @@ extern int sdp_data_debug_level; #define SOCK_REF_SEQ "SEQ" /* during proc read */ #define SOCK_REF_DREQ_TO "DREQ_TO" /* dreq timeout is pending */ #define SOCK_REF_ZCOPY "ZCOPY" /* zcopy send in process */ +#define SOCK_REF_RDMA_RD "RDMA_RD" /* RDMA read in process */ #define sock_hold(sk, msg) sock_ref(sk, msg, sock_hold) #define sock_put(sk, msg) sock_ref(sk, msg, sock_put) diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 4035cc1dc63ba..eab89a69308d1 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -70,6 +70,7 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst, #include #include #include +#include /* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */ #include "sdp_socket.h" #include "sdp.h" @@ -94,7 +95,7 @@ SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME, "Default idle time in seconds before keepalive probe sent."); SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 0, "Zero copy send using SEND threshold; 0=0ff."); -SDP_MODPARAM_SINT(sdp_zcopy_thresh, 64*1024, +SDP_MODPARAM_INT(sdp_zcopy_thresh, 64*1024, "Zero copy using RDMA threshold; 0=0ff."); #define SDP_RX_COAL_TIME_HIGH 128 SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000, @@ -434,10 +435,9 @@ void sdp_reset_sk(struct sock *sk, int rc) if (ssk->tx_ring.cq) sdp_xmit_poll(ssk, 1); - if (ssk->tx_sa) { - sdp_unmap_dma(sk, ssk->tx_sa->addrs, ssk->tx_sa->page_cnt); - ssk->tx_sa->addrs = NULL; - } + sdp_abort_srcavail(sk); + + sdp_abort_rdma_read(sk); if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) { sdp_dbg(sk, "setting state to error\n"); @@ -1413,6 +1413,72 @@ static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz) return NULL; } +static int sdp_get_user_pages(struct page **pages, const unsigned int nr_pages, + unsigned long uaddr, int rw) +{ + int res, i; + + /* Try to fault in all of the necessary pages */ + down_read(¤t->mm->mmap_sem); + /* rw==READ means read from drive, write into memory area */ + res = get_user_pages( + current, + current->mm, + uaddr, + nr_pages, + rw == READ, + 0, /* don't force */ + pages, + NULL); + up_read(¤t->mm->mmap_sem); + + /* Errors and no page mapped should return here */ + if (res < nr_pages) + return res; + + for (i=0; i < nr_pages; i++) { + /* FIXME: flush superflous for rw==READ, + * probably wrong function for rw==WRITE + */ + flush_dcache_page(pages[i]); + } + + return nr_pages; +} + +static int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, + unsigned long addr) +{ + int done_pages = 0; + + sdp_dbg_data(sk, "count: 0x%x addr: 0x%lx\n", page_cnt, addr); + + addr &= PAGE_MASK; + if (segment_eq(get_fs(), KERNEL_DS)) { + for (done_pages = 0; done_pages < page_cnt; done_pages++) { + pages[done_pages] = virt_to_page(addr); + if (!pages[done_pages]) + break; + get_page(pages[done_pages]); + addr += PAGE_SIZE; + } + } else { + done_pages = sdp_get_user_pages(pages, page_cnt, addr, WRITE); + } + + if (unlikely(done_pages != page_cnt)) + goto err; + + return 0; + +err: + sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n", + done_pages, page_cnt); + for (; done_pages > 0; done_pages--) + page_cache_release(pages[done_pages - 1]); + + return -1; +} static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, char __user *base, @@ -1737,16 +1803,26 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, int iovlen, flags; int size_goal; int err, copied; + int zcopied = 0; long timeo; struct bzcopy_state *bz = NULL; SDPSTATS_COUNTER_INC(sendmsg); if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) { - err = sdp_sendmsg_zcopy(iocb, sk, msg, size); - if (err != -EAGAIN && err != -ETIME) - return err; + zcopied = sdp_sendmsg_zcopy(iocb, sk, msg, size); + if (zcopied == -EAGAIN || zcopied == -ETIME) { + sdp_dbg_data(sk, "Got SendSM/Timedout - fallback to regular send\n"); + } else if (zcopied < 0) { + sdp_dbg_data(sk, "ZCopy send err: %d\n", zcopied); + return zcopied; + } + + if (size == zcopied) { + return size; + } - /* Got SendSM/Timedout - fallback to regular send */ + sdp_dbg_data(sk, "ZCopied: 0x%x\n", zcopied); + sdp_dbg_data(sk, "Continue to BCopy 0x%lx bytes left\n", size - zcopied); } lock_sock(sk); @@ -1914,7 +1990,9 @@ out: posts_handler_put(ssk); release_sock(sk); - return copied; + + sdp_dbg_data(sk, "BCopied: 0x%x\n", copied); + return copied + zcopied; do_fault: sdp_prf(sk, skb, "prepare fault"); @@ -2177,14 +2255,15 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, goto skb_cleanup; } } else { + sdp_dbg_data(sk, "memcpy 0x%lx bytes +0x%x -> %p\n", + used, offset, msg->msg_iov[0].iov_base); + err = skb_copy_datagram_iovec(skb, offset, /* TODO: skip header? */ msg->msg_iov, used); if (rx_sa) { rx_sa->used += used; rx_sa->reported += used; - sdp_dbg_data(sk, "copied %ld from " - "payload\n", used); } } if (err) { diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index 573373a4c7681..eae49df1a52bc 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -237,16 +237,18 @@ static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) } if (wc->wr_id & SDP_OP_RDMA) { + /* TODO: handle failed RDMA read cqe */ + sdp_dbg_data(sk, "TX comp: RDMA read. status: %d\n", wc->status); sdp_prf1(sk, NULL, "TX comp: RDMA read"); if (!ssk->tx_ring.rdma_inflight) { - sdp_dbg(sk, "ERROR: unexpected RDMA read\n"); + sdp_warn(sk, "ERROR: unexpected RDMA read\n"); return; } if (!ssk->tx_ring.rdma_inflight->busy) { - sdp_dbg(sk, "ERROR: too many RDMA read completions\n"); + sdp_warn(sk, "ERROR: too many RDMA read completions\n"); return; } @@ -334,7 +336,7 @@ static int sdp_tx_handler_select(struct sdp_sock *ssk) return 0; } else sdp_prf1(sk, NULL, "Unexpected: sk_sleep=%p, " - "waitqueue_active: %d", + "waitqueue_active: %d\n", sk->sk_sleep, waitqueue_active(sk->sk_sleep)); } @@ -371,8 +373,10 @@ static void sdp_poll_tx_timeout(unsigned long data) goto out; } - if (unlikely(sk->sk_state == TCP_CLOSE)) + if (unlikely(sk->sk_state == TCP_CLOSE)) { + sdp_warn(sk, "Socket is closed\n"); goto out; + } wc_processed = sdp_process_tx_cq(ssk); if (!wc_processed) diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index 2831dd609842f..81baa6ed5b8ef 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -39,24 +39,34 @@ #include #include #include -#include +#include #include /* for memcpy_toiovec */ #include #include #include #include "sdp.h" -static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, - int page_idx, int off, size_t len) +static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa) { struct sdp_sock *ssk = sdp_sk(sk); struct sk_buff *skb; int payload_len; + struct page *payload_pg; + int off, len; + struct ib_umem_chunk *chunk; WARN_ON(ssk->tx_sa); BUG_ON(!tx_sa); BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey); + BUG_ON(!tx_sa->umem); + BUG_ON(!tx_sa->umem->chunk_list.next); + + chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list); + BUG_ON(!chunk->nmap); + + off = tx_sa->umem->offset; + len = tx_sa->umem->length; tx_sa->bytes_sent = tx_sa->bytes_acked = 0; @@ -64,17 +74,22 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, if (!skb) { return -ENOMEM; } - sdp_prf1(sk, skb, "sending SrcAvail"); + sdp_dbg_data(sk, "sending SrcAvail\n"); TX_SRCAVAIL_STATE(skb) = tx_sa; /* tx_sa is hanged on the skb * but continue to live after skb is freed */ ssk->tx_sa = tx_sa; /* must have payload inlined in SrcAvail packet in combined mode */ - payload_len = min(len, PAGE_SIZE - off); - get_page(tx_sa->pages[page_idx]); + payload_len = tx_sa->umem->page_size - off; + payload_pg = sg_page(&chunk->page_list[0]); + get_page(payload_pg); + + sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n", + off, payload_pg, payload_len); + skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, - tx_sa->pages[page_idx], off, payload_len); + payload_pg, off, payload_len); skb->len += payload_len; skb->data_len = payload_len; @@ -87,7 +102,7 @@ static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, ssk->write_seq += payload_len; SDP_SKB_CB(skb)->end_seq += payload_len; - tx_sa->bytes_sent = len; + tx_sa->bytes_sent = tx_sa->umem->length; tx_sa->bytes_acked = payload_len; /* TODO: pushing the skb into the tx_queue should be enough */ @@ -129,7 +144,7 @@ void srcavail_cancel_timeout(struct work_struct *work) release_sock(sk); } -static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, +static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int ignore_signals) { struct sock *sk = &ssk->isk.sk; @@ -217,62 +232,46 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, return err; } -static int sdp_wait_rdma_wr_finished(struct sdp_sock *ssk, long *timeo_p) +static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk) { struct sock *sk = &ssk->isk.sk; - int err = 0; - long current_timeo = *timeo_p; + long timeo = HZ * 5; /* Timeout for for RDMA read */ DEFINE_WAIT(wait); sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n"); while (1) { - prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); + prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE); - if (unlikely(!ssk->qp_active)) { - err = -EPIPE; - sdp_warn(sk, "Socket closed\n"); - break; - } - - if (unlikely(!*timeo_p)) { - err = -EAGAIN; - sdp_warn(sk, "Timedout\n"); + if (!ssk->tx_ring.rdma_inflight->busy) { + sdp_dbg_data(sk, "got rdma cqe\n"); break; } - if (unlikely(signal_pending(current))) { - err = sock_intr_errno(*timeo_p); - sdp_warn(sk, "Signalled\n"); + if (!ssk->qp_active) { + sdp_warn(sk, "QP destroyed\n"); break; } - if (!ssk->tx_ring.rdma_inflight->busy) { - sdp_dbg_data(sk, "got rdma cqe\n"); + if (!timeo) { + sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n"); + WARN_ON(1); break; } posts_handler_put(ssk); sdp_prf1(sk, NULL, "Going to sleep"); - sk_wait_event(sk, ¤t_timeo, + sk_wait_event(sk, &timeo, !ssk->tx_ring.rdma_inflight->busy); sdp_prf1(sk, NULL, "Woke up"); sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); posts_handler_get(ssk); - - *timeo_p = current_timeo; } finish_wait(sk->sk_sleep, &wait); - if (ssk->tx_ring.rdma_inflight->busy) { - sdp_warn(sk, "RDMA reads are in the air: %d\n", - ssk->tx_ring.rdma_inflight->busy); - } else - sdp_dbg_data(sk, "Finished waiting - no rdma's inflight\n"); - - return err; + sdp_dbg_data(sk, "Finished waiting\n"); } int sdp_post_rdma_rd_compl(struct sdp_sock *ssk, @@ -397,236 +396,115 @@ out: return; } -static int sdp_get_user_pages(struct page **pages, const unsigned int nr_pages, - unsigned long uaddr, int rw) -{ - int res, i; - - /* Try to fault in all of the necessary pages */ - down_read(¤t->mm->mmap_sem); - /* rw==READ means read from drive, write into memory area */ - res = get_user_pages( - current, - current->mm, - uaddr, - nr_pages, - rw == READ, - 0, /* don't force */ - pages, - NULL); - up_read(¤t->mm->mmap_sem); - - /* Errors and no page mapped should return here */ - if (res < nr_pages) - return res; - - for (i=0; i < nr_pages; i++) { - /* FIXME: flush superflous for rw==READ, - * probably wrong function for rw==WRITE - */ - flush_dcache_page(pages[i]); - } - - return nr_pages; -} - -int sdp_get_pages(struct sock *sk, struct page **pages, int page_cnt, - unsigned long addr) +static int sdp_alloc_fmr(struct sock *sk, void *uaddr, size_t len, + struct ib_pool_fmr **_fmr, struct ib_umem **_umem) { - int done_pages = 0; - - sdp_dbg_data(sk, "count: 0x%x addr: 0x%lx\n", page_cnt, addr); + struct ib_pool_fmr *fmr; + struct ib_umem *umem; + u64 *pages; + struct ib_umem_chunk *chunk; + int n, j, k; + int rc = 0; - addr &= PAGE_MASK; - if (segment_eq(get_fs(), KERNEL_DS)) { - for (done_pages = 0; done_pages < page_cnt; done_pages++) { - pages[done_pages] = virt_to_page(addr); - if (!pages[done_pages]) - break; - get_page(pages[done_pages]); - addr += PAGE_SIZE; - } - } else { - done_pages = sdp_get_user_pages(pages, page_cnt, addr, WRITE); + if (len > SDP_MAX_RDMA_READ_LEN) { + sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n", + len, SDP_MAX_RDMA_READ_LEN); + len = SDP_MAX_RDMA_READ_LEN; } - if (unlikely(done_pages != page_cnt)) - goto err; + sdp_dbg_data(sk, "user buf: %p, len:0x%lx\n", uaddr, len); - return 0; + umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len, + IB_ACCESS_REMOTE_WRITE, 0); -err: - sdp_warn(sk, "Error getting pages. done_pages: %d page_cnt: %d\n", - done_pages, page_cnt); - for (; done_pages > 0; done_pages--) - page_cache_release(pages[done_pages - 1]); + if (IS_ERR(umem)) { + rc = PTR_ERR(umem); + sdp_warn(sk, "Error getting umem: %d\n", rc); + goto err_umem_get; + } - return -1; -} + sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n", + umem->offset, umem->length); -static void sdp_put_pages(struct sock *sk, struct page **pages, int page_cnt) -{ - int i; - sdp_dbg_data(sk, "count: %d\n", page_cnt); + pages = (u64 *) __get_free_page(GFP_KERNEL); + if (!pages) + goto err_pages_alloc; - for (i = 0; i < page_cnt; i++) { - set_page_dirty_lock(pages[i]); - page_cache_release(pages[i]); - } -} + n = 0; -static int sdp_map_dma(struct sock *sk, u64 *addrs, struct page **pages, - int nr_pages, size_t offset, size_t count) -{ - struct ib_device *dev = sdp_sk(sk)->ib_device; - int i = 0; - sdp_dbg_data(sk, "map dma offset: 0x%lx count: 0x%lx\n", offset, count); - -#define map_page(p, o, l) ({\ - u64 addr = ib_dma_map_page(dev, p, o, l, DMA_TO_DEVICE); \ - if (ib_dma_mapping_error(dev, addr)) { \ - sdp_warn(sk, "Error mapping page %p off: 0x%lx len: 0x%lx\n", \ - p, o, l); \ - goto err; \ - } \ - addr; \ -}) - - if (nr_pages > 1) { - size_t length = PAGE_SIZE - offset; - addrs[0] = map_page(pages[0], offset, length); - count -= length; - for (i=1; i < nr_pages ; i++) { - length = count < PAGE_SIZE ? count : PAGE_SIZE; - addrs[i] = map_page(pages[i], 0UL, length); - count -= PAGE_SIZE; + list_for_each_entry(chunk, &umem->chunk_list, list) { + for (j = 0; j < chunk->nmap; ++j) { + len = sg_dma_len(&chunk->page_list[j]) >> PAGE_SHIFT; + for (k = 0; k < len; ++k) { + pages[n++] = sg_dma_address(&chunk->page_list[j]) + + umem->page_size * k; + } } - } else - addrs[0] = map_page(pages[0], offset, count); + } - return 0; -err: - for (; i > 0; i--) { - ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE); + fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0); + if (IS_ERR(fmr)) { + sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr)); + goto err_fmr_alloc; } - return -1; -} -void sdp_unmap_dma(struct sock *sk, u64 *addrs, int page_cnt) -{ - int i; - struct ib_device *dev = sdp_sk(sk)->ib_device; + free_page((unsigned long) pages); - sdp_dbg_data(sk, "count: %d\n", page_cnt); + *_umem = umem; + *_fmr = fmr; - for (i = 0; i < page_cnt; i++) - ib_dma_unmap_page(dev, addrs[i], PAGE_SIZE, DMA_TO_DEVICE); -} + return 0; -static int sdp_map_dma_sge(struct sock *sk, struct ib_sge *sge, int page_cnt, - struct page **pages, int offset, int len) -{ - struct ib_device *dev = sdp_sk(sk)->ib_device; - int i; - int left = len; - sdp_dbg_data(sk, "offset: %d len: %d\n", offset, len); - - for (i = 0; i < page_cnt; i++) { - int o = i == 0 ? offset : 0; - int l = MIN(left, PAGE_SIZE - o); - - sge[i].lkey = sdp_sk(sk)->sdp_dev->mr->lkey; - sge[i].length = l; - - sge[i].addr = ib_dma_map_page(dev, - pages[i], - 0, /* map page with offset (fbo) not working */ - l + o, /* compensate on 0 offset */ - DMA_FROM_DEVICE); - if (ib_dma_mapping_error(dev, sge[i].addr)) { - sdp_warn(sk, "Error map page 0x%llx off: %d len: %d\n", - sge[i].addr, o, l); - goto err; - } - sge[i].addr += o; +err_fmr_alloc: + free_page((unsigned long) pages); - sdp_dbg_data(sk, "mapping %03d: page: %p o: %d l: %d | " - "addr: 0x%llx length: %d\n", - i, pages[i], o, l, sge[i].addr, sge[i].length); - left -= l; - } +err_pages_alloc: + ib_umem_release(umem); - WARN_ON(left != 0); +err_umem_get: - return 0; -err: - for (; i > 0; i--) { - ib_dma_unmap_page(dev, sge[i].addr, PAGE_SIZE, DMA_FROM_DEVICE); - } - return -1; + return rc; } -static void sdp_unmap_dma_sge(struct sock *sk, struct ib_sge *sge, - int page_cnt, int offset, int len) +void sdp_free_fmr(struct sock *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem) { - int i; - struct ib_device *dev = sdp_sk(sk)->ib_device; + if (!sdp_sk(sk)->qp_active) + return; - sdp_dbg_data(sk, "count: %d\n", page_cnt); + ib_fmr_pool_unmap(*_fmr); + *_fmr = NULL; - for (i = 0; i < page_cnt; i++) { - int l = PAGE_SIZE; - - if (i == page_cnt - 1) { - /* Last page */ - l = (len + offset) & (PAGE_SIZE - 1); - if (l == 0) - l = PAGE_SIZE; - } - ib_dma_unmap_page(dev, sge[i].addr, l, DMA_FROM_DEVICE); - } + ib_umem_release(*_umem); + *_umem = NULL; } -static struct ib_pool_fmr *sdp_map_fmr(struct sock *sk, int page_cnt, - u64 *addrs) +static int sdp_post_rdma_read(struct sock *sk, struct rx_srcavail_state *rx_sa) { - struct ib_pool_fmr *fmr; - int ret = 0; - - fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, addrs, - page_cnt, 0); - if (IS_ERR(fmr)) { - ret = PTR_ERR(fmr); - fmr = NULL; - sdp_warn(sk, "Error allocating fmr: %d\n", ret); - goto err; - } + struct sdp_sock *ssk = sdp_sk(sk); + struct ib_send_wr *bad_wr; + struct ib_send_wr wr = { NULL }; + struct ib_sge sge; - return fmr; -err: - return NULL; -} + wr.opcode = IB_WR_RDMA_READ; + wr.next = NULL; + wr.wr_id = SDP_OP_RDMA; + wr.wr.rdma.rkey = rx_sa->rkey; + wr.send_flags = 0; -enum zcopy_type { - SDP_ZCOPY_TYPE_RX, - SDP_ZCOPY_TYPE_TX, -}; + ssk->tx_ring.rdma_inflight = rx_sa; -static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int page_cnt) -{ - struct tx_srcavail_state *tx_sa; + sge.addr = rx_sa->umem->offset; + sge.length = rx_sa->umem->length; + sge.lkey = rx_sa->fmr->fmr->lkey; + wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used; + wr.num_sge = 1; + wr.sg_list = &sge; + rx_sa->busy++; - tx_sa = kzalloc(sizeof(struct tx_srcavail_state) + - sizeof(struct page *) * page_cnt + - sizeof(u64) * page_cnt, GFP_KERNEL); - if (!tx_sa) - return ERR_PTR(-ENOMEM); - - tx_sa->pages = (struct page **)(tx_sa+1); - tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]); + wr.send_flags = IB_SEND_SIGNALED; - return tx_sa; + return ib_post_send(ssk->qp, &wr, &bad_wr); } int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, @@ -634,139 +512,70 @@ int sdp_rdma_to_iovec(struct sock *sk, struct iovec *iov, struct sk_buff *skb, { struct sdp_sock *ssk = sdp_sk(sk); struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(skb); + int got_srcavail_cancel; int rc = 0; - struct ib_send_wr *bad_wr; - struct ib_send_wr wr = { NULL }; - long timeo; - struct ib_sge *sge; - int sge_left; - int copied; - int offset; sdp_dbg_data(&ssk->isk.sk, "preparing RDMA read." " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len); - if (len > rx_sa->len) - len = rx_sa->len; - - offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1); - rx_sa->page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT; - sdp_dbg_data(sk, "page_cnt = 0x%x len:0x%x offset: 0x%x\n", - rx_sa->page_cnt, len, offset); + sock_hold(sk, SOCK_REF_RDMA_RD); - rx_sa->pages = - (struct page **) kzalloc(sizeof(struct page *) * rx_sa->page_cnt + - sizeof(struct ib_sge) * rx_sa->page_cnt, GFP_KERNEL); - if (!rx_sa->pages) { - sdp_warn(sk, "Error allocating zcopy context\n"); - goto err_alloc_zcopy; + if (len > rx_sa->len) { + sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len); + WARN_ON(1); + len = rx_sa->len; } - rx_sa->sge = (struct ib_sge *)(&rx_sa->pages[rx_sa->page_cnt]); - - rc = sdp_get_pages(sk, rx_sa->pages, rx_sa->page_cnt, - (unsigned long)iov->iov_base); - if (rc) - goto err_get_pages; - - rc = sdp_map_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, rx_sa->pages, - offset, len); - if (rc) - goto err_map_dma; - - wr.opcode = IB_WR_RDMA_READ; - wr.next = NULL; - wr.wr_id = SDP_OP_RDMA; - wr.wr.rdma.rkey = rx_sa->rkey; - wr.send_flags = 0; + rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem); + if (rc) { + sdp_warn(sk, "Error allocating fmr: %d\n", rc); + goto err_alloc_fmr; + } - timeo = sock_sndtimeo(sk, 0); + rc = sdp_post_rdma_read(sk, rx_sa); + if (unlikely(rc)) { + sdp_warn(sk, "ib_post_send failed with status %d.\n", rc); + sdp_set_error(&ssk->isk.sk, -ECONNRESET); + wake_up(&ssk->wq); + goto err_post_send; + } - ssk->tx_ring.rdma_inflight = rx_sa; - copied = 0; - sge = rx_sa->sge; - sge_left = rx_sa->page_cnt; - do { - /* Len error when using sge_cnt > 30 ?? */ - int sge_cnt = min(sge_left, ssk->max_sge - 2); - - wr.wr.rdma.remote_addr = rx_sa->vaddr + copied + rx_sa->used; - wr.num_sge = sge_cnt; - wr.sg_list = sge; - rx_sa->busy++; - - sdp_dbg_data(sk, "rdma read: sge_cnt: %d vaddr: 0x%llx " - "copied: 0x%x rkey: 0x%x in_bytes: 0x%x\n", - sge_cnt, wr.wr.rdma.remote_addr, copied, rx_sa->rkey, - sge_bytes(sge, sge_cnt)); - if (sge_left == sge_cnt) { - wr.send_flags = IB_SEND_SIGNALED; - sdp_dbg_data(sk, "last wr is signaled\n"); - } - sdp_prf1(sk, NULL, "TX: RDMA read 0x%x bytes %s", - sge_bytes(sge, sge_cnt), - wr.send_flags & IB_SEND_SIGNALED ? "Signalled" : ""); - - rc = ib_post_send(ssk->qp, &wr, &bad_wr); - if (unlikely(rc)) { - sdp_warn(sk, "ib_post_send failed with status %d.\n", - rc); - sdp_set_error(&ssk->isk.sk, -ECONNRESET); - wake_up(&ssk->wq); - break; - } + sdp_prf(sk, skb, "Finished posting(rc=%d), now to wait", rc); - copied += sge_bytes(sge, sge_cnt); - sge_left -= sge_cnt; - sge += sge_cnt; + got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq; - if (unlikely(ssk->srcavail_cancel_mseq > rx_sa->mseq)) { - sdp_warn(sk, "got SrcAvailCancel - Aborting RDMA\n"); - rc = -EAGAIN; - } - } while (!rc && sge_left > 0); + sdp_arm_tx_cq(sk); - if (!rc || rc == -EAGAIN) { - int got_srcavail_cancel = (rc == -EAGAIN); + sdp_wait_rdma_wr_finished(ssk); - sdp_arm_tx_cq(sk); + sdp_prf(sk, skb, "Finished waiting(rc=%d)", rc); + if (!ssk->qp_active) { + sdp_warn(sk, "QP destroyed during RDMA read\n"); + rc = -EPIPE; + goto err_post_send; + } - rc = sdp_wait_rdma_wr_finished(ssk, &timeo); + /* Ignore any data copied after getting SrcAvailCancel */ + if (!got_srcavail_cancel) { + int copied = rx_sa->umem->length; - /* Ignore any data copied after getting SrcAvailCancel */ - if (!got_srcavail_cancel && !rc) { - sdp_update_iov_used(sk, iov, copied); - rx_sa->used += copied; - atomic_add(copied, &ssk->rcv_nxt); - } + sdp_update_iov_used(sk, iov, copied); + rx_sa->used += copied; + atomic_add(copied, &ssk->rcv_nxt); } - if (ssk->tx_ring.rdma_inflight->busy) { - sdp_warn(sk, "Aborted waiting for RDMA completion before finished!" - "Memory might be corrupted.\n"); - WARN_ON(1); - } + ssk->tx_ring.rdma_inflight = NULL; + +err_post_send: + sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); +err_alloc_fmr: if (rc && ssk->qp_active) { - /* post rdma, wait_for_compl or post rdma_rd_comp failed - - * post sendsm */ - sdp_warn(sk, "post rdma, wait_for_compl " - "or post rdma_rd_comp failed - post sendsm\n"); + sdp_warn(sk, "Couldn't do RDMA - post sendsm\n"); rx_sa->flags |= RX_SA_ABORTED; - ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get - the dirty logic from recvmsg */ } - ssk->tx_ring.rdma_inflight = NULL; - - sdp_unmap_dma_sge(sk, rx_sa->sge, rx_sa->page_cnt, offset, len); -err_map_dma: - sdp_put_pages(sk, rx_sa->pages, rx_sa->page_cnt); -err_get_pages: - kfree(rx_sa->pages); - rx_sa->pages = NULL; - rx_sa->sge = NULL; -err_alloc_zcopy: + sock_put(sk, SOCK_REF_RDMA_RD); return rc; } @@ -792,86 +601,74 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) return ret; } -static int sdp_rdma_adv_single(struct sock *sk, - struct tx_srcavail_state *tx_sa, struct iovec *iov, - int page_cnt, int offset, int len) +static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa, + struct iovec *iov, long *timeo) { struct sdp_sock *ssk = sdp_sk(sk); - long timeo = SDP_SRCAVAIL_ADV_TIMEOUT; - unsigned long lock_flags; int rc = 0; + unsigned long lock_flags; - sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n", - offset, len, page_cnt); + rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len, + &tx_sa->fmr, &tx_sa->umem); + if (rc) { + sdp_warn(sk, "Error allocating fmr: %d\n", rc); + goto err_alloc_fmr; + } if (tx_slots_free(ssk) == 0) { - rc = wait_for_sndbuf(sk, &timeo); + rc = wait_for_sndbuf(sk, timeo); if (rc) { sdp_warn(sk, "Couldn't get send buffer\n"); - return rc; + goto err_no_tx_slots; } } - tx_sa->fmr = sdp_map_fmr(sk, page_cnt, &tx_sa->addrs[0]); - if (!tx_sa->fmr) { - sdp_warn(sk, "Error allocating fmr\n"); - return -ENOMEM; - } - - rc = sdp_post_srcavail(sk, tx_sa, 0, offset, len); + rc = sdp_post_srcavail(sk, tx_sa); if (rc) { sdp_dbg(sk, "Error posting SrcAvail\n"); goto err_abort_send; } - rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0); + rc = sdp_wait_rdmardcompl(ssk, timeo, 0); if (unlikely(rc)) { enum tx_sa_flag f = tx_sa->abort_flags; if (f & TX_SA_SENDSM) { - sdp_dbg_data(sk, "got SendSM. use SEND verb.\n"); + sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n"); } else if (f & TX_SA_ERROR) { sdp_dbg_data(sk, "SrcAvail error completion\n"); sdp_reset(sk); } else if (ssk->qp_active) { - if (f & TX_SA_INTRRUPTED) - sdp_dbg_data(sk, "SrcAvail error completion\n"); - else - sdp_warn(sk, "SrcAvail send aborted flag = 0x%x.\n", f); + if (!(f & TX_SA_INTRRUPTED)) + sdp_warn(sk, "abort_flag = 0x%x.\n", f); sdp_post_srcavail_cancel(sk); /* Wait for RdmaRdCompl/SendSM to * finish the transaction */ - timeo = 2 * HZ; + *timeo = 2 * HZ; sdp_dbg_data(sk, "Waiting for SendSM\n"); - sdp_wait_rdmardcompl(ssk, &timeo, len, 1); + sdp_wait_rdmardcompl(ssk, timeo, 1); sdp_dbg_data(sk, "finished waiting\n"); } else { sdp_warn(sk, "QP was destroyed while waiting\n"); } - - goto err_abort_send; + } else { + sdp_dbg_data(sk, "got RdmaRdCompl\n"); } - sdp_prf1(sk, NULL, "got RdmaRdCompl"); - -err_abort_send: - sdp_update_iov_used(sk, iov, tx_sa->bytes_acked); - - ib_fmr_pool_unmap(tx_sa->fmr); spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); ssk->tx_sa = NULL; spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags); - return rc; -} +err_abort_send: + sdp_update_iov_used(sk, iov, tx_sa->bytes_acked); -static inline size_t get_page_count(unsigned long uaddr, size_t count) -{ - unsigned long end = (uaddr + count + PAGE_SIZE - 1) >> PAGE_SHIFT; - unsigned long start = uaddr >> PAGE_SHIFT; - return end - start; +err_no_tx_slots: + sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); + +err_alloc_fmr: + return rc; } int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, @@ -884,10 +681,9 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, long timeo; struct tx_srcavail_state *tx_sa; int offset; + size_t bytes_to_copy = 0; int copied = 0; - int page_cnt; - sdp_dbg_data(sk, "%s\n", __func__); sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start"); if (ssk->rx_sa) { @@ -903,6 +699,10 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, posts_handler_get(ssk); flags = msg->msg_flags; + + iovlen = msg->msg_iovlen; + iov = msg->msg_iov; + timeo = SDP_SRCAVAIL_ADV_TIMEOUT ; /* Wait for a connection to finish. */ @@ -914,8 +714,6 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); /* Ok commence sending. */ - iovlen = msg->msg_iovlen; - iov = msg->msg_iov; offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1); sdp_dbg_data(sk, "Sending iov: %p, iovlen: 0x%lx, size: 0x%lx\n", iov->iov_base, iov->iov_len, size); @@ -933,59 +731,71 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, goto err; } - page_cnt = min(get_page_count((unsigned long)iov->iov_base, - iov->iov_len), SDP_FMR_SIZE); - - tx_sa = sdp_alloc_tx_sa(sk, page_cnt); - if (IS_ERR(tx_sa)) { + tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL); + if (!tx_sa) { sdp_warn(sk, "Error allocating zcopy context\n"); rc = -EAGAIN; /* Buffer too big - fallback to bcopy */ goto err_alloc_tx_sa; } + bytes_to_copy = iov->iov_len; do { - size_t off = (unsigned long)iov->iov_base & ~PAGE_MASK; - size_t len = page_cnt * PAGE_SIZE - off; - if (len > iov->iov_len) - len = iov->iov_len; - - tx_sa->page_cnt = page_cnt; - - rc = sdp_get_pages(sk, tx_sa->pages, page_cnt, - (unsigned long)iov->iov_base); - if (rc) - goto err_get_pages; - - rc = sdp_map_dma(sk, tx_sa->addrs, - tx_sa->pages, page_cnt, - off, len); - if (rc) - goto err_map_dma; - - rc = sdp_rdma_adv_single(sk, tx_sa, iov, page_cnt, off, len); - if (rc) - sdp_dbg_data(sk, "Error sending SrcAvail. rc = %d\n", rc); - - - if (tx_sa->addrs) - sdp_unmap_dma(sk, tx_sa->addrs, page_cnt); -err_map_dma: - sdp_put_pages(sk, tx_sa->pages, page_cnt); -err_get_pages: - page_cnt = min(get_page_count((unsigned long)iov->iov_base, - iov->iov_len), SDP_FMR_SIZE); - copied += tx_sa->bytes_acked; tx_sa_reset(tx_sa); + + rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo); + + if (iov->iov_len < sdp_zcopy_thresh) { + sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n", + iov->iov_len); + break; + } } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags); + kfree(tx_sa); err_alloc_tx_sa: err: + copied = bytes_to_copy - iov->iov_len; sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied); posts_handler_put(ssk); + release_sock(sk); + sock_put(&ssk->isk.sk, SOCK_REF_ZCOPY); return rc ?: copied; } +void sdp_abort_srcavail(struct sock *sk) +{ + struct sdp_sock *ssk = sdp_sk(sk); + struct tx_srcavail_state *tx_sa = ssk->tx_sa; + unsigned long flags; + + if (!tx_sa) + return; + + cancel_delayed_work(&ssk->srcavail_cancel_work); + flush_scheduled_work(); + + spin_lock_irqsave(&ssk->tx_sa_lock, flags); + + sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem); + + ssk->tx_sa = NULL; + + spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); +} + +void sdp_abort_rdma_read(struct sock *sk) +{ + struct sdp_sock *ssk = sdp_sk(sk); + struct rx_srcavail_state *rx_sa = ssk->rx_sa; + + if (!rx_sa) + return; + + sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); + + ssk->rx_sa = NULL; +}