]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: Fix data corretness regression test failure.
authorJim Mott <jim@mellanox.com>
Sun, 11 Nov 2007 17:20:02 +0000 (19:20 +0200)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:11 +0000 (05:04 -0700)
Mellanox regression testing for data correctness started failing
after the recent addition of bzcopy. This was because sdp_sendmsg
returned before all in-flight RC transfers completed.
This allowed user space to modify buffers that had not been sent.

A big oops.

This fixes that bug.  Small frame bandwidth is even worse
now, but small frame latency is lower which is good.  The
default transfer size that triggers bzcopy has been
increased to the bandwidth crossover point found in
MLX4-MLX4 tests.  More work will be required to find the
best value for the release.

Signed-off-by: Jim Mott <jim@mellanox.com>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_main.c

index d0f080fba28d4c5c15e51da6a7ecc2abf5af925d..c1cc57993f03d81ce7a87db45c9a6deafec45d8c 100644 (file)
@@ -148,6 +148,8 @@ struct sdp_sock {
        unsigned rx_tail;
        unsigned mseq_ack;
        unsigned bufs;
+       unsigned max_bufs;      /* Initial buffers offered by other side */
+       unsigned min_bufs;      /* Low water mark to wake senders */
 
        int               remote_credits;
 
@@ -168,13 +170,28 @@ struct sdp_sock {
        int recv_frags;         /* max skb frags in recv packets */
        int send_frags;         /* max skb frags in send packets */
 
-       /* ZCOPY data */
-       int zcopy_thresh;
+       /* BZCOPY data */
+       int   zcopy_thresh;
+       void *zcopy_context;
 
        struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
        struct ib_wc  ibwc[SDP_NUM_WC];
 };
 
+/* Context used for synchronous zero copy bcopy (BZCOY) */
+struct bzcopy_state {
+       unsigned char __user  *u_base;
+       int                    u_len;
+       int                    left;
+       int                    page_cnt;
+       int                    cur_page;
+       int                    cur_offset;
+       int                    busy;
+       struct sdp_sock      *ssk;
+       struct page         **pages;
+};
+
+
 extern struct proto sdp_proto;
 extern struct workqueue_struct *sdp_workqueue;
 
@@ -246,5 +263,6 @@ void sdp_remove_large_sock(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 void sdp_post_keepalive(struct sdp_sock *ssk);
 void sdp_start_keepalive_timer(struct sock *sk);
+void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 
 #endif
index 963105c0f93dc0ec0269284ec25575a74fe7bd79..bcc2ac8f9487bef3aad45db9270600647cd82b1b 100644 (file)
@@ -240,6 +240,19 @@ struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
 
        ssk->snd_una += TCP_SKB_CB(skb)->end_seq;
        ++ssk->tx_tail;
+
+       /* TODO: AIO and real zcopy cdoe; add their context support here */
+       if (ssk->zcopy_context && skb->data_len) {
+               struct bzcopy_state *bz;
+               struct sdp_bsdh *h;
+
+               h = (struct sdp_bsdh *)skb->data;
+               if (h->mid == SDP_MID_DATA) {
+                       bz = (struct bzcopy_state *)ssk->zcopy_context;
+                       bz->busy--;
+               }
+       }
+
        return skb;
 }
 
@@ -668,8 +681,6 @@ static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
                                wake_up(&ssk->wq);
                        }
                }
-
-               sk_stream_write_space(&ssk->isk.sk);
        } else {
                sdp_cnt(sdp_keepalive_probes_sent);
 
@@ -688,11 +699,6 @@ static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
                return;
        }
 
-       if (likely(!wc->status)) {
-               sdp_post_recvs(ssk);
-               sdp_post_sends(ssk, 0);
-       }
-
        if (ssk->time_wait && !ssk->isk.sk.sk_send_head &&
            ssk->tx_head == ssk->tx_tail) {
                sdp_dbg(&ssk->isk.sk, "%s: destroy in time wait state\n",
@@ -719,6 +725,21 @@ int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
                        ret = 0;
                }
        } while (n == SDP_NUM_WC);
+
+       if (!ret) {
+               struct sock *sk = &ssk->isk.sk;
+
+               sdp_post_recvs(ssk);
+               sdp_post_sends(ssk, 0);
+
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+                       if (ssk->zcopy_context)
+                               sdp_bzcopy_write_space(ssk);
+                       else
+                               sk_stream_write_space(&ssk->isk.sk);
+               }
+       }
+
        return ret;
 }
 
index 5e1480ec17d9d2f8a875d79c76a1c11e7e7eafc9..6ed8e458a2a6540ce4f29807927b0de0cf2536d5 100644 (file)
@@ -234,16 +234,19 @@ int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
                return rc;
        }
 
-       sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
+       sdp_sk(child)->max_bufs = sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
+       sdp_sk(child)->min_bufs = sdp_sk(child)->bufs / 4;
        sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
                sizeof(struct sdp_bsdh);
        sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
                PAGE_SIZE;
        sdp_resize_buffers(sdp_sk(child), ntohl(h->desremrcvsz));
 
-       sdp_dbg(child, "%s bufs %d xmit_size_goal %d\n", __func__,
+       sdp_dbg(child, "%s bufs %d xmit_size_goal %d send trigger %d\n",
+               __func__,
                sdp_sk(child)->bufs,
-               sdp_sk(child)->xmit_size_goal);
+               sdp_sk(child)->xmit_size_goal,
+               sdp_sk(child)->min_bufs);
 
        id->context = child;
        sdp_sk(child)->id = id;
@@ -276,15 +279,18 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
                return 0;
 
        h = event->param.conn.private_data;
-       sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+       sdp_sk(sk)->max_bufs = sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
+       sdp_sk(sk)->min_bufs = sdp_sk(sk)->bufs / 4;
        sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
                sizeof(struct sdp_bsdh);
        sdp_sk(sk)->send_frags = PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
                PAGE_SIZE;
 
-       sdp_dbg(sk, "%s bufs %d xmit_size_goal %d\n", __func__,
+       sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send trigger %d\n",
+               __func__,
                sdp_sk(sk)->bufs,
-               sdp_sk(sk)->xmit_size_goal);
+               sdp_sk(sk)->xmit_size_goal,
+               sdp_sk(sk)->min_bufs);
 
        ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
 
index 84c3ff4679e69cce997b2e2d88ce9c8721bd60c5..809f7b8aaf8a394720eb8116f2393d371274e8f2 100644 (file)
@@ -74,16 +74,6 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst,
 #include "sdp.h"
 #include <linux/delay.h>
 
-struct bzcopy_state {
-       unsigned char __user  *u_base;
-       int                    u_len;
-       int                    left;
-       int                    page_cnt;
-        int                    cur_page;
-        int                    cur_offset;
-       struct page          **pages;
-};
-
 MODULE_AUTHOR("Michael S. Tsirkin");
 MODULE_DESCRIPTION("InfiniBand SDP module");
 MODULE_LICENSE("Dual BSD/GPL");
@@ -141,7 +131,7 @@ static unsigned int sdp_keepalive_time = SDP_KEEPALIVE_TIME;
 module_param_named(sdp_keepalive_time, sdp_keepalive_time, uint, 0644);
 MODULE_PARM_DESC(sdp_keepalive_time, "Default idle time in seconds before keepalive probe sent.");
 
-static int sdp_zcopy_thresh = 2048;
+static int sdp_zcopy_thresh = 8192;
 module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
 MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
 
@@ -1213,9 +1203,12 @@ void sdp_push_one(struct sock *sk, unsigned int mss_now)
 {
 }
 
-static struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
+static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
 {
        int i;
+       struct sdp_sock *ssk = (struct sdp_sock *)bz->ssk;
+
+       ssk->zcopy_context = NULL;
 
        if (bz->pages) {
                for (i = bz->cur_page; i < bz->page_cnt; i++)
@@ -1266,6 +1259,8 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
        bz->u_len      = len;
        bz->left       = len;
        bz->cur_offset = addr & ~PAGE_MASK;
+       bz->busy       = 0;
+       bz->ssk        = ssk;
        bz->page_cnt   = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT;
        bz->pages      = kcalloc(bz->page_cnt, sizeof(struct page *), GFP_KERNEL);
 
@@ -1287,6 +1282,7 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
        }
 
        up_write(&current->mm->mmap_sem);
+       ssk->zcopy_context = bz;
 
        return bz;
 
@@ -1398,6 +1394,7 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
        int this_page, left;
        struct sdp_sock *ssk = sdp_sk(sk);
 
+       /* Push the first chunk to page align all following - TODO: review */
        if (skb_shinfo(skb)->nr_frags == ssk->send_frags) {
                sdp_mark_push(ssk, skb);
                return SDP_NEW_SEG;
@@ -1449,9 +1446,110 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
        }
 
        bz->left -= copy;
+       bz->busy++;
        return copy;
 }
 
+static inline int slots_free(struct sdp_sock *ssk)
+{
+       int min_free;
+
+       min_free = SDP_TX_SIZE - (ssk->tx_head - ssk->tx_tail);
+       if (ssk->bufs < min_free)
+               min_free = ssk->bufs;
+       min_free -= (min_free < SDP_MIN_BUFS) ? min_free : SDP_MIN_BUFS;
+
+       return min_free;
+};
+
+/* like sk_stream_memory_free - except measures remote credits */
+static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk)
+{
+       struct bzcopy_state *bz = (struct bzcopy_state *)ssk->zcopy_context;
+
+       BUG_ON(!bz);
+       return slots_free(ssk) > bz->busy;
+}
+
+/* like sk_stream_wait_memory - except waits on remote credits */
+static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p)
+{
+       struct sock *sk = &ssk->isk.sk;
+       struct bzcopy_state *bz = (struct bzcopy_state *)ssk->zcopy_context;
+       int err = 0;
+       long vm_wait = 0;
+       long current_timeo = *timeo_p;
+       DEFINE_WAIT(wait);
+
+       BUG_ON(!bz);
+
+       if (sdp_bzcopy_slots_avail(ssk))
+               current_timeo = vm_wait = (net_random() % (HZ / 5)) + 2;
+
+       while (1) {
+               set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+               prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
+
+               if (unlikely(sk->sk_err | (sk->sk_shutdown & SEND_SHUTDOWN))) {
+                       err = -EPIPE;
+                       break;
+               }
+
+               if (unlikely(!*timeo_p)) {
+                       err = -EAGAIN;
+                       break;
+               }
+
+               if (unlikely(signal_pending(current))) {
+                       err = sock_intr_errno(*timeo_p);
+                       break;
+               }
+
+               clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+
+               if (sdp_bzcopy_slots_avail(ssk))
+                       break;
+
+               set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+               sk->sk_write_pending++;
+               sk_wait_event(sk, &current_timeo,
+                       sdp_bzcopy_slots_avail(ssk) && vm_wait);
+               sk->sk_write_pending--;
+
+               if (vm_wait) {
+                       vm_wait -= current_timeo;
+                       current_timeo = *timeo_p;
+                       if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
+                           (current_timeo -= vm_wait) < 0)
+                               current_timeo = 0;
+                       vm_wait = 0;
+               }
+               *timeo_p = current_timeo;
+       }
+
+       finish_wait(sk->sk_sleep, &wait);
+       return err;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+       struct sock *sk = &ssk->isk.sk;
+       struct socket *sock = sk->sk_socket;
+
+       if (ssk->bufs >= ssk->min_bufs &&
+           ssk->tx_head == ssk->tx_tail &&
+          sock != NULL) {
+               clear_bit(SOCK_NOSPACE, &sock->flags);
+
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+                       wake_up_interruptible(sk->sk_sleep);
+               if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+                       sock_wake_async(sock, 2, POLL_OUT);
+       }
+}
+
 
 /* Like tcp_sendmsg */
 /* TODO: check locking */
@@ -1510,11 +1608,20 @@ int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                            (copy = size_goal - skb->len) <= 0) {
 
 new_segment:
-                               /* Allocate new segment. If the interface is SG,
-                                * allocate skb fitting to single page.
+                               /*
+                                * Allocate a new segment
+                                *   For bcopy, we stop sending once we have
+                                * SO_SENDBUF bytes in flight.  For bzcopy
+                                * we stop sending once we run out of remote
+                                * receive credits.
                                 */
-                               if (!sk_stream_memory_free(sk))
-                                       goto wait_for_sndbuf;
+                               if (bz) {
+                                       if (!sdp_bzcopy_slots_avail(ssk))
+                                               goto wait_for_sndbuf;
+                               } else {
+                                       if (!sk_stream_memory_free(sk))
+                                               goto wait_for_sndbuf;
+                               }
 
                                skb = sk_stream_alloc_pskb(sk, select_size(sk, ssk),
                                                           0, sk->sk_allocation);
@@ -1586,7 +1693,9 @@ wait_for_memory:
                        if (copied)
                                sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
 
-                       if ((err = sk_stream_wait_memory(sk, &timeo)) != 0)
+                       err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo) :
+                                    sk_stream_wait_memory(sk, &timeo);
+                       if (err)
                                goto do_error;
 
                        mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
@@ -1595,12 +1704,30 @@ wait_for_memory:
        }
 
 out:
-       if (bz)
-               bz = sdp_bz_cleanup(bz);
-       if (copied)
+       if (copied) {
                sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
-       if (size > send_poll_thresh)
-               poll_send_cq(sk);
+               if (bz) {
+                       int max_retry;
+
+                       /* Wait for in-flight sends; should be quick */
+                       for (max_retry = 0; max_retry < 10000; max_retry++) {
+                               if (!bz->busy)
+                                       break;
+
+                               poll_send_cq(sk);
+                       }
+
+                       if (bz->busy)
+                               sdp_warn(sk,
+                                        "Could not reap %d in-flight sends\n",
+                                        bz->busy);
+
+                       bz = sdp_bz_cleanup(bz);
+               } else
+                       if (size > send_poll_thresh)
+                               poll_send_cq(sk);
+       }
+
        release_sock(sk);
        return copied;