]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: split very big tx buffer into smaller sends
authorAmir Vadai <amirv@mellanox.co.il>
Wed, 29 Jul 2009 15:20:10 +0000 (18:20 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:36 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index 0bb249bffc79fc7ea223573d6a31df36168118a0..7881afa7f7d4492d041a5d50112c5a8aee47a8b2 100644 (file)
@@ -242,6 +242,9 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define SDP_NAGLE_TIMEOUT (HZ / 10)
 
 #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
+#define SDP_SRCAVAIL_ADV_TIMEOUT (1 * HZ)
+
+#define MAX_ZCOPY_SEND_SIZE (512 * 1024)
 
 #define SDP_RESOLVE_TIMEOUT 1000
 #define SDP_ROUTE_TIMEOUT 1000
@@ -425,11 +428,13 @@ struct rx_srcavail_state {
 };
 
 struct tx_srcavail_state {
-       u8              busy;
-
        u32             page_cnt;
        struct page     **pages;
        u64             *addrs;
+
+       /* Data below 'busy' will be reset */
+       u8              busy;
+
        struct ib_pool_fmr *fmr;
        u32             bytes_completed;
        u32             bytes_total;
@@ -438,6 +443,12 @@ struct tx_srcavail_state {
        u32             mseq;
 };
 
+static inline void tx_sa_reset(struct tx_srcavail_state *tx_sa)
+{
+       memset((void *)&tx_sa->busy, 0,
+                       sizeof(*tx_sa) - offsetof(typeof(*tx_sa), busy));
+}
+
 #define ring_head(ring)   (atomic_read(&(ring).head))
 #define ring_tail(ring)   (atomic_read(&(ring).tail))
 #define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
index b743c4cf26c7c8d94f4380a544f192697a2ea069..34b4b8c62c505fc9e3efc3125adf6f4f32cea265 100644 (file)
@@ -288,10 +288,16 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 
                tx_sa = TX_SRCAVAIL_STATE(skb);
                if (unlikely(tx_sa)) {
-                       if (likely(!tx_sa->abort))
-                               sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL);
-                       else
-                               sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n");       
+                       if (ssk->tx_sa != tx_sa) {
+                               sdp_warn(&ssk->isk.sk, "SrcAvail cancelled "
+                                               "before being sent!\n");
+                               __kfree_skb(skb);
+                       } else {
+                               if (likely(!tx_sa->abort))
+                                       sdp_post_send(ssk, skb, SDP_MID_SRCAVAIL);
+                               else
+                                       sdp_warn(&ssk->isk.sk, "Not sending aborted SrcAvail\n");       
+                       }
                } else {
                        sdp_post_send(ssk, skb, SDP_MID_DATA);
                }
index 179c32422d05baed0b137f6dcd1f3e713d4c37d1..0acf0298c203ca5aed4ca3f1c7e07333af083aac 100644 (file)
@@ -1696,7 +1696,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
        if (sdp_zcopy_thresh && size > sdp_zcopy_thresh) {
                err = sdp_sendmsg_zcopy(iocb, sk, msg, size);
-               if (err != -EAGAIN)
+               if (err != -EAGAIN && err != -ETIME)
                        return err;
 
                /* Got SendSM/Timedout - fallback to regular send */    
index bac3f71d6a826db176b288b4607b0abc7e0e094a..f3c0bef9fee5f08c3cc9380b2cd99719c69a9dce 100644 (file)
@@ -131,7 +131,7 @@ static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p, int len,
        struct tx_srcavail_state *tx_sa = ssk->tx_sa;
        DEFINE_WAIT(wait);
 
-       sdp_dbg_data(sk, "Going to sleep till get RdmaRdCompl.\n");
+       sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
        sdp_prf1(sk, NULL, "Going to sleep");
        while (ssk->qp_active) {
                prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
@@ -340,8 +340,8 @@ void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
        ssk->tx_sa->abort = 1;
        cancel_delayed_work(&ssk->srcavail_cancel_work);
 
-       wake_up(ssk->isk.sk.sk_sleep);
-       sdp_dbg_data(&ssk->isk.sk, "woke up sleepers\n");
+       wake_up(sk->sk_sleep);
+       sdp_dbg_data(sk, "woke up sleepers\n");
 
 out:
        spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
@@ -584,13 +584,10 @@ enum zcopy_type {
        SDP_ZCOPY_TYPE_TX,
 };
 
-static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset,
-               int len)
+static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int page_cnt)
 {
        struct tx_srcavail_state *tx_sa;
-       int page_cnt;
 
-       page_cnt = PAGE_ALIGN(len + offset) >> PAGE_SHIFT;
 
        tx_sa = kzalloc(sizeof(struct tx_srcavail_state) +
                        sizeof(struct page *) * page_cnt + 
@@ -599,7 +596,6 @@ static struct tx_srcavail_state *sdp_alloc_tx_sa(struct sock *sk, int offset,
                return ERR_PTR(-ENOMEM);
        
        tx_sa->pages = (struct page **)(tx_sa+1);
-       tx_sa->page_cnt = page_cnt;
        tx_sa->addrs = (u64 *)(&tx_sa->pages[page_cnt]);
 
        return tx_sa;
@@ -740,6 +736,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        int ret = 0;
+
        sdp_dbg_data(sk, "Wait for mem\n");
 
        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -755,6 +752,78 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
        return ret;
 }
 
+static int sdp_rdma_adv_single(struct sock *sk,
+               struct tx_srcavail_state *tx_sa, struct iovec *iov, 
+               int offset, int page_cnt, int len, u64 *addrs)
+{
+       struct sdp_sock *ssk = sdp_sk(sk);
+       unsigned long timeo = SDP_SRCAVAIL_ADV_TIMEOUT; //sock_sndtimeo(sk, 0);
+       unsigned long lock_flags;
+       int rc = 0;
+
+       if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
+               rc = wait_for_sndbuf(sk, &timeo);
+               if (rc) {
+                       sdp_warn(sk, "Couldn't get send buffer\n");
+                       return rc;
+               }
+       }
+
+       tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs);
+       if (!tx_sa->fmr) {
+               return -ENOMEM;
+       }
+
+       tx_sa->bytes_completed = 0;
+       tx_sa->bytes_total = len;
+       rc = sdp_post_srcavail(sk, tx_sa, offset, len);
+       if (rc)
+               goto err_abort_send;
+
+       rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
+       if (unlikely(rc)) {
+               switch (rc) {
+               case -EAGAIN: /* Got SendSM */
+                       sdp_warn(sk, "got SendSM. use SEND verb.\n");
+                       break;
+
+               case -ETIME: /* Timedout */
+                       sdp_warn(sk, "TX srcavail timedout.\n");
+               case -EINTR: /* interrupted */
+                       sdp_prf1(sk, NULL, "Aborting send.");
+                       sdp_post_srcavail_cancel(sk);
+
+                       /* Wait for RdmaRdCompl/SendSM to
+                        * finish the transaction */
+                       timeo = 2 * HZ;
+                       sdp_warn(sk, "Waiting for SendSM\n");
+                       sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
+                       sdp_warn(sk, "finished waiting\n");
+
+                       break;
+
+               default:
+                       sdp_warn(sk, "error sending srcavail. rc = %d\n", rc);
+                       /* Socked destroyed while waited */
+                       break;
+               }
+
+               goto err_abort_send;
+       }
+       sdp_prf1(sk, NULL, "got RdmaRdCompl");
+
+       sdp_update_iov_used(sk, iov, tx_sa->bytes_completed);
+
+err_abort_send:
+       ib_fmr_pool_unmap(tx_sa->fmr);
+
+       spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
+       ssk->tx_sa = NULL;
+       spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
+
+       return rc;
+}
+
 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                size_t size)
 {
@@ -767,10 +836,10 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        int offset;
        int copied = 0;
 
+       int page_cnt;
        int bytes_left;
        int pages_left;
        u64 *addrs;
-       unsigned long lock_flags;
 
        sdp_dbg_data(sk, "%s\n", __func__);
 
@@ -781,7 +850,7 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        posts_handler_get(ssk);
 
        flags = msg->msg_flags;
-       timeo = sock_sndtimeo(sk, 0);
+       timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
 
        /* Wait for a connection to finish. */
        if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT))
@@ -811,95 +880,55 @@ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                goto err;
        }
 
-       tx_sa = sdp_alloc_tx_sa(sk, offset, iov->iov_len);
+       page_cnt = PAGE_ALIGN(min_t(int, iov->iov_len, MAX_ZCOPY_SEND_SIZE) + offset) 
+                       >> PAGE_SHIFT;
+       tx_sa = sdp_alloc_tx_sa(sk, page_cnt);
        if (IS_ERR(tx_sa)) {
                sdp_warn(sk, "Error allocating zcopy context\n");
+               rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
                goto err_alloc_tx_sa;
        }
 
-       rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt,
-                       (unsigned long)iov->iov_base);
-       if (rc)
-               goto err_get_pages;
-
-       rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages,
-                       offset, iov->iov_len);
-       if (rc)
-               goto err_map_dma;
-
-       bytes_left = iov->iov_len;
-       pages_left = tx_sa->page_cnt;
-       addrs = tx_sa->addrs;
        do {
-               int page_cnt = min(256, pages_left);
-               int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
-               sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
-
-               if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
-                       rc = wait_for_sndbuf(sk, &timeo);
-                       if (rc) {
-                               sdp_warn(sk, "Couldn't get send buffer\n");
-                               break;
-                       }
-               }
+               tx_sa->page_cnt = page_cnt;
 
-               tx_sa->fmr = sdp_map_fmr(sk, page_cnt, addrs);
-               if (!tx_sa->fmr) {
-                       rc = -ENOMEM;
-                       break;
-               }
-
-               tx_sa->bytes_completed = 0;
-               tx_sa->bytes_total = len;
-               rc = sdp_post_srcavail(sk, tx_sa, offset, len);
+               rc = sdp_get_pages(sk, tx_sa->pages, tx_sa->page_cnt,
+                               (unsigned long)iov->iov_base);
                if (rc)
-                       goto err_abort_send;
-
-               rc = sdp_wait_rdmardcompl(ssk, &timeo, len, 0);
-               if (unlikely(rc)) {
-                       switch (rc) {
-                       case -EAGAIN: /* Got SendSM */
-                               sdp_warn(sk, "got SendSM. use SEND verb.\n");
-                               goto err_abort_send;
-
-                       case -ETIME: /* Timedout */
-                               sdp_warn(sk, "TX srcavail timedout.\n");
-                       case -EINTR: /* interrupted */
-                               sdp_prf1(sk, NULL, "Aborting send.");
-                               sdp_post_srcavail_cancel(sk);
-
-                               /* Wait for RdmaRdCompl/SendSM to
-                                * finish the transaction */
-                               sdp_wait_rdmardcompl(ssk, &timeo, len, 1);
-
-                               goto err_abort_send;
-
-                       default:
-                               sdp_warn(sk, "error sending srcavail. rc = %d\n", rc);
-                               /* Socked destroyed while waited */
-                               goto err_abort_send;
-                       }
-               }
-               sdp_prf1(sk, NULL, "got RdmaRdCompl");
-               copied += len;
-               bytes_left -= len;
-               pages_left -= page_cnt;
-               addrs += page_cnt;
-               offset = 0;
+                       goto err_get_pages;
 
-               sdp_update_iov_used(sk, iov, tx_sa->bytes_completed);
-
-err_abort_send:
-               ib_fmr_pool_unmap(tx_sa->fmr);
-               spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
-               ssk->tx_sa = NULL;
-               spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
-       } while (!rc && !tx_sa->abort && pages_left > 0);
-
-       sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
+               rc = sdp_map_dma(sk, tx_sa->addrs, tx_sa->page_cnt, tx_sa->pages,
+                               offset, iov->iov_len);
+               if (rc)
+                       goto err_map_dma;
+
+               bytes_left = iov->iov_len;
+               pages_left = tx_sa->page_cnt;
+               addrs = tx_sa->addrs;
+               do {
+                       int p_cnt = min(256, pages_left);
+                       int len = min_t(int, page_cnt * PAGE_SIZE - offset, bytes_left);
+
+                       sdp_dbg_data(sk, "bytes_left: %d\n", bytes_left);
+                       rc = sdp_rdma_adv_single(sk, tx_sa, iov,
+                                       offset, p_cnt, len, addrs);
+
+                       copied += len;
+                       bytes_left -= len;
+                       pages_left -= p_cnt;
+                       addrs += p_cnt;
+                       offset = 0;
+               } while (!rc && !tx_sa->abort && pages_left > 0);
+
+               sdp_unmap_dma(sk, tx_sa->addrs, tx_sa->page_cnt);
 err_map_dma:   
-       sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt);
+               sdp_put_pages(sk, tx_sa->pages, tx_sa->page_cnt);
 err_get_pages:
+
+               page_cnt = PAGE_ALIGN(min_t(int, iov->iov_len, MAX_ZCOPY_SEND_SIZE)) 
+                       >> PAGE_SHIFT;
+               tx_sa_reset(tx_sa);
+       } while (!rc && iov->iov_len > 0);
        kfree(tx_sa);
 err_alloc_tx_sa:
 err: