From: Amir Vadai Date: Wed, 29 Jul 2009 15:20:10 +0000 (+0300) Subject: sdp: split very big tx buffer into smaller sends X-Git-Tag: v4.1.12-92~264^2~5^2~257 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=69ee5776e6a41ca5afc16fe3d0924fc2b58d1e3b;p=users%2Fjedix%2Flinux-maple.git sdp: split very big tx buffer into smaller sends Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 0bb249bffc79f..7881afa7f7d44 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -242,6 +242,9 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define SDP_NAGLE_TIMEOUT (HZ / 10) #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5) +#define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ) + +#define MAX_ZCOPY_SEND_SIZE (512 * 1024) #define SDP_RESOLVE_TIMEOUT 1000 #define SDP_ROUTE_TIMEOUT 1000 @@ -425,11 +428,13 @@ struct rx_srcavail_state { }; struct tx_srcavail_state { - u8 busy; - u32 page_cnt; struct page **pages; u64 *addrs; + + /* Data below 'busy' will be reset */ + u8 busy; + struct ib_pool_fmr *fmr; u32 bytes_completed; u32 bytes_total; @@ -438,6 +443,12 @@ struct tx_srcavail_state { u32 mseq; }; +static inline void tx_sa_reset(struct tx_srcavail_state *tx_sa) +{ + memset((void *)&tx_sa->busy, 0, + sizeof(*tx_sa) - offsetof(typeof(*tx_sa), busy)); +} + #define ring_head(ring) (atomic_read(&(ring).head)) #define ring_tail(ring) (atomic_read(&(ring).tail)) #define ring_posted(ring) (ring_head(ring) - ring_tail(ring)) diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index b743c4cf26c7c..34b4b8c62c505 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -288,10 +288,16 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) tx_sa = TX_SRCAVAIL_STATE(skb); if (unlikely(tx_sa)) { - if (likely(!tx_sa->abort)) - sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL); - else - sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n"); + if (ssk->tx_sa != tx_sa) { + sdp_warn(&ssk->isk.sk, "SrcAvail cancelled " + "before being sent!\n"); + __kfree_skb(skb); + } else { + if (likely(!tx_sa->abort)) + sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL); + else + sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n"); + } } else { sdp_post_send(ssk, skb, SDP_MID_DATA); } diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 179c32422d05b..0acf0298c203c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -1696,7 +1696,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) { err = sdp_sendmsg_zcopy(iocb, sk, msg, size); - if (err != -EAGAIN) + if (err != -EAGAIN && err != -ETIME) return err; /* Got SendSM/Timedout - fallback to regular send */ diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index bac3f71d6a826..f3c0bef9fee5f 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -131,7 +131,7 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len, struct tx_srcavail_state *tx_sa = ssk->tx_sa; DEFINE_WAIT(wait); - sdp_dbg_data(sk, "Going to sleep till get RdmaRdCompl.\n"); + sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p); sdp_prf1(sk, NULL, "Going to sleep"); while (ssk->qp_active) { prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); @@ -340,8 +340,8 @@ void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack) ssk->tx_sa->abort = 1; cancel_delayed_work(&ssk->srcavail_cancel_work); - wake_up(ssk->isk.sk.sk_sleep); - sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n"); + wake_up(sk->sk_sleep); + sdp_dbg_data(sk, "woke up sleepers\n"); out: spin_unlock_irqrestore(&ssk->tx_sa_lock, flags); @@ -584,13 +584,10 @@ enum zcopy_type { SDP_ZCOPY_TYPE_TX, }; -static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset, - int len) +static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int page_cnt) { struct tx_srcavail_state *tx_sa; - int page_cnt; - page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT; tx_sa = kzalloc(sizeof(struct tx_srcavail_state) + sizeof(struct page *) * page_cnt + @@ -599,7 +596,6 @@ static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset, return ERR_PTR(-ENOMEM); tx_sa->pages = (struct page **)(tx_sa+1); - tx_sa->page_cnt = page_cnt; tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]); return tx_sa; @@ -740,6 +736,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) { struct sdp_sock *ssk = sdp_sk(sk); int ret = 0; + sdp_dbg_data(sk, "Wait for mem\n"); set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); @@ -755,6 +752,78 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) return ret; } +static int sdp_rdma_adv_single(struct sock *sk, + struct tx_srcavail_state *tx_sa, struct iovec *iov, + int offset, int page_cnt, int len, u64 *addrs) +{ + struct sdp_sock *ssk = sdp_sk(sk); + unsigned long timeo = SDP_SRCAVAIL_ADV_TIMEOUT; //sock_sndtimeo(sk, 0); + unsigned long lock_flags; + int rc = 0; + + if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) { + rc = wait_for_sndbuf(sk, &timeo); + if (rc) { + sdp_warn(sk, "Couldn't get send buffer\n"); + return rc; + } + } + + tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs); + if (!tx_sa->fmr) { + return -ENOMEM; + } + + tx_sa->bytes_completed = 0; + tx_sa->bytes_total = len; + rc = sdp_post_srcavail(sk, tx_sa, offset, len); + if (rc) + goto err_abort_send; + + rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0); + if (unlikely(rc)) { + switch (rc) { + case -EAGAIN: /* Got SendSM */ + sdp_warn(sk, "got SendSM. use SEND verb.\n"); + break; + + case -ETIME: /* Timedout */ + sdp_warn(sk, "TX srcavail timedout.\n"); + case -EINTR: /* interrupted */ + sdp_prf1(sk, NULL, "Aborting send."); + sdp_post_srcavail_cancel(sk); + + /* Wait for RdmaRdCompl/SendSM to + * finish the transaction */ + timeo = 2 * HZ; + sdp_warn(sk, "Waiting for SendSM\n"); + sdp_wait_rdmardcompl(ssk, &timeo, len, 1); + sdp_warn(sk, "finished waiting\n"); + + break; + + default: + sdp_warn(sk, "error sending srcavail. rc = %d\n", rc); + /* Socked destroyed while waited */ + break; + } + + goto err_abort_send; + } + sdp_prf1(sk, NULL, "got RdmaRdCompl"); + + sdp_update_iov_used(sk, iov, tx_sa->bytes_completed); + +err_abort_send: + ib_fmr_pool_unmap(tx_sa->fmr); + + spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); + ssk->tx_sa = NULL; + spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags); + + return rc; +} + int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, size_t size) { @@ -767,10 +836,10 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, int offset; int copied = 0; + int page_cnt; int bytes_left; int pages_left; u64 *addrs; - unsigned long lock_flags; sdp_dbg_data(sk, "%s\n", __func__); @@ -781,7 +850,7 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, posts_handler_get(ssk); flags = msg->msg_flags; - timeo = sock_sndtimeo(sk, 0); + timeo = SDP_SRCAVAIL_ADV_TIMEOUT ; /* Wait for a connection to finish. */ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) @@ -811,95 +880,55 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, goto err; } - tx_sa = sdp_alloc_tx_sa(sk, offset, iov->iov_len); + page_cnt = PAGE_ALIGN(min_t(int, iov->iov_len, MAX_ZCOPY_SEND_SIZE) + offset) + >> PAGE_SHIFT; + tx_sa = sdp_alloc_tx_sa(sk, page_cnt); if (IS_ERR(tx_sa)) { sdp_warn(sk, "Error allocating zcopy context\n"); + rc = -EAGAIN; /* Buffer too big - fallback to bcopy */ goto err_alloc_tx_sa; } - rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt, - (unsigned long)iov->iov_base); - if (rc) - goto err_get_pages; - - rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages, - offset, iov->iov_len); - if (rc) - goto err_map_dma; - - bytes_left = iov->iov_len; - pages_left = tx_sa->page_cnt; - addrs = tx_sa->addrs; do { - int page_cnt = min(256, pages_left); - int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left); - sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left); - - if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) { - rc = wait_for_sndbuf(sk, &timeo); - if (rc) { - sdp_warn(sk, "Couldn't get send buffer\n"); - break; - } - } + tx_sa->page_cnt = page_cnt; - tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs); - if (!tx_sa->fmr) { - rc = -ENOMEM; - break; - } - - tx_sa->bytes_completed = 0; - tx_sa->bytes_total = len; - rc = sdp_post_srcavail(sk, tx_sa, offset, len); + rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt, + (unsigned long)iov->iov_base); if (rc) - goto err_abort_send; - - rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0); - if (unlikely(rc)) { - switch (rc) { - case -EAGAIN: /* Got SendSM */ - sdp_warn(sk, "got SendSM. use SEND verb.\n"); - goto err_abort_send; - - case -ETIME: /* Timedout */ - sdp_warn(sk, "TX srcavail timedout.\n"); - case -EINTR: /* interrupted */ - sdp_prf1(sk, NULL, "Aborting send."); - sdp_post_srcavail_cancel(sk); - - /* Wait for RdmaRdCompl/SendSM to - * finish the transaction */ - sdp_wait_rdmardcompl(ssk, &timeo, len, 1); - - goto err_abort_send; - - default: - sdp_warn(sk, "error sending srcavail. rc = %d\n", rc); - /* Socked destroyed while waited */ - goto err_abort_send; - } - } - sdp_prf1(sk, NULL, "got RdmaRdCompl"); - copied += len; - bytes_left -= len; - pages_left -= page_cnt; - addrs += page_cnt; - offset = 0; + goto err_get_pages; - sdp_update_iov_used(sk, iov, tx_sa->bytes_completed); - -err_abort_send: - ib_fmr_pool_unmap(tx_sa->fmr); - spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags); - ssk->tx_sa = NULL; - spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags); - } while (!rc && !tx_sa->abort && pages_left > 0); - - sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt); + rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages, + offset, iov->iov_len); + if (rc) + goto err_map_dma; + + bytes_left = iov->iov_len; + pages_left = tx_sa->page_cnt; + addrs = tx_sa->addrs; + do { + int p_cnt = min(256, pages_left); + int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left); + + sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left); + rc = sdp_rdma_adv_single(sk, tx_sa, iov, + offset, p_cnt, len, addrs); + + copied += len; + bytes_left -= len; + pages_left -= p_cnt; + addrs += p_cnt; + offset = 0; + } while (!rc && !tx_sa->abort && pages_left > 0); + + sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt); err_map_dma: - sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt); + sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt); err_get_pages: + + page_cnt = PAGE_ALIGN(min_t(int, iov->iov_len, MAX_ZCOPY_SEND_SIZE)) + >> PAGE_SHIFT; + tx_sa_reset(tx_sa); + } while (!rc && iov->iov_len > 0); kfree(tx_sa); err_alloc_tx_sa: err: