]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: fix RX to work well on sink side + cosmetics changes
authorAmir Vadai <amirv@mellanox.co.il>
Wed, 3 Jun 2009 06:34:49 +0000 (09:34 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:30 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c

index 25e18442b2de5d6555eab7ef1664a1bd592760cd..9b1980fdabadf33c8776d624cf6969814ccaa919 100644 (file)
@@ -29,7 +29,6 @@
 #define sdp_warn(sk, format, arg...)                         \
        sdp_printk(KERN_WARNING, sk, format , ## arg)
 
-
 #define rx_ring_lock(ssk, f) do { \
        spin_lock_irqsave(&ssk->rx_ring.lock, f); \
 } while (0)
@@ -66,7 +65,7 @@ static inline unsigned long long current_nsec(void)
        getnstimeofday(&tv);
        return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
 }
-#define sdp_prf(sk, s, format, arg...) ({ \
+#define sdp_prf1(sk, s, format, arg...) ({ \
        struct sdpprf_log *l = &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
        l->idx = sdpprf_log_count - 1; \
        l->pid = current->pid; \
@@ -80,7 +79,11 @@ static inline unsigned long long current_nsec(void)
        l->line = __LINE__; \
        1; \
 })
+#define sdp_prf(sk, s, format, arg...)
+//#define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg)
+
 #else
+#define sdp_prf1(sk, s, format, arg...)
 #define sdp_prf(sk, s, format, arg...)
 #endif
 
@@ -156,6 +159,7 @@ struct sdpstats {
        u32 rx_poll_miss;
        u32 tx_poll_miss;
        u32 tx_poll_hit;
+       u32 tx_poll_busy;
        u32 memcpy_count;
        u32 credits_before_update[64];
        u32 send_interval[25];
@@ -344,6 +348,21 @@ struct sdp_chrecvbuf {
        u32 size;
 };
 
+#define posts_handler(ssk) ({\
+       atomic_read(&ssk->somebody_is_doing_posts); \
+})
+
+#define posts_handler_get(ssk) ({\
+       sdp_prf(&ssk->isk.sk, NULL, "posts handler get. %d", posts_handler(ssk)); \
+       atomic_inc(&ssk->somebody_is_doing_posts); \
+})
+
+#define posts_handler_put(ssk) ({\
+       sdp_prf(&ssk->isk.sk, NULL, "posts handler put. %d", posts_handler(ssk)); \
+       atomic_dec(&ssk->somebody_is_doing_posts); \
+       sdp_do_posts(ssk); \
+})
+
 struct sdp_sock {
        /* sk has to be the first member of inet_sock */
        struct inet_sock isk;
@@ -359,6 +378,8 @@ struct sdp_sock {
        struct delayed_work dreq_wait_work;
        struct work_struct destroy_work;
 
+       atomic_t somebody_is_doing_posts;
+
        /* Like tcp_sock */
        u16 urg_data;
        u32 urg_seq;
@@ -554,7 +575,7 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
 void sdp_rx_ring_destroy(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_schedule_post_recvs(struct sdp_sock *ssk);
+void sdp_do_posts(struct sdp_sock *ssk);
 void sdp_rx_comp_full(struct sdp_sock *ssk);
 
 static inline void sdp_arm_rx_cq(struct sock *sk)
index 432b198939bf82d5a7466e9c9422de0a83fea169..38b303a4e40c48b328a30d931d4e120a345ab365 100644 (file)
@@ -210,6 +210,7 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
 
 
        sdp_dbg(&ssk->isk.sk, "destroying qp\n");
+       sdp_prf(&ssk->isk.sk, NULL, "destroying qp");
 
        del_timer(&ssk->tx_ring.timer);
 
@@ -291,7 +292,7 @@ out:
        sock_put(sk, SOCK_REF_BORN);
 }
 
-static void sdp_init_timer(struct sock *sk)
+static void sdp_init_keepalive_timer(struct sock *sk)
 {
        init_timer(&sk->sk_timer);
 
@@ -930,6 +931,8 @@ int sdp_init_sock(struct sock *sk)
        ssk->destruct_in_process = 0;
        spin_lock_init(&ssk->lock);
 
+       atomic_set(&ssk->somebody_is_doing_posts, 0);
+
        return 0;
 }
 
@@ -967,7 +970,7 @@ static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb)
 {
        TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
        ssk->pushed_seq = ssk->write_seq;
-       sdp_post_sends(ssk, 0);
+       sdp_do_posts(ssk);
 }
 
 static inline void sdp_push_pending_frames(struct sock *sk)
@@ -975,7 +978,6 @@ static inline void sdp_push_pending_frames(struct sock *sk)
        struct sk_buff *skb = sk->sk_send_head;
        if (skb) {
                sdp_mark_push(sdp_sk(sk), skb);
-               sdp_post_sends(sdp_sk(sk), 0);
        }
 }
 
@@ -1208,23 +1210,6 @@ static int sdp_recv_urg(struct sock *sk, long timeo,
        return -EAGAIN;
 }
 
-static unsigned int sdp_current_mss(struct sock *sk, int large_allowed)
-{
-       /* TODO */
-       return PAGE_SIZE;
-}
-
-static int forced_push(struct sdp_sock *sk)
-{
-       /* TODO */
-       return 0;
-}
-
-static inline int select_size(struct sock *sk, struct sdp_sock *ssk)
-{
-       return 0;
-}
-
 static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags)
 {
        if (unlikely(flags & MSG_OOB)) {
@@ -1233,12 +1218,11 @@ static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags
        }
 }
 
-static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags,
-                           int mss_now, int nonagle)
+static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags)
 {
        if (sk->sk_send_head)
                sdp_mark_urg(sk, ssk, flags);
-       sdp_post_sends(ssk, nonagle);
+       sdp_do_posts(sdp_sk(sk));
 }
 
 static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
@@ -1254,10 +1238,6 @@ static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
                 ssk->nonagle &= ~TCP_NAGLE_PUSH;
 }
 
-static void sdp_push_one(struct sock *sk, unsigned int mss_now)
-{
-}
-
 static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
 {
        int i, max_retry;
@@ -1375,7 +1355,6 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
        return bz;
 }
 
-
 #define TCP_PAGE(sk)   (sk->sk_sndmsg_page)
 #define TCP_OFF(sk)    (sk->sk_sndmsg_off)
 static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
@@ -1468,7 +1447,6 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
        return copy;
 }
 
-
 static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
                                 unsigned char __user *from, int copy,
                                 struct bzcopy_state *bz)
@@ -1585,17 +1563,23 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 
                clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
 
+               posts_handler_put(ssk);
+
                if (sdp_bzcopy_slots_avail(ssk, bz))
                        break;
 
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                sk->sk_write_pending++;
-               sdp_prf(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d",
+               sdp_prf1(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d",
                                tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring),
                                bz->busy);
                sk_wait_event(sk, &current_timeo,
                        sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
                sk->sk_write_pending--;
+               sdp_prf1(sk, NULL, "finished wait for mem");
+
+               posts_handler_get(ssk);
+               sdp_do_posts(ssk);
 
                if (vm_wait) {
                        vm_wait -= current_timeo;
@@ -1621,7 +1605,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        struct sdp_sock *ssk = sdp_sk(sk);
        struct sk_buff *skb;
        int iovlen, flags;
-       int mss_now, size_goal;
+       int size_goal;
        int err, copied;
        long timeo;
        struct bzcopy_state *bz = NULL;
@@ -1629,6 +1613,8 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        lock_sock(sk);
        sdp_dbg_data(sk, "%s\n", __func__);
 
+       posts_handler_get(ssk);
+
        flags = msg->msg_flags;
        timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
 
@@ -1640,7 +1626,6 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        /* This should be in poll */
        clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
 
-       mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
        size_goal = ssk->xmit_size_goal;
 
        /* Ok commence sending. */
@@ -1697,13 +1682,10 @@ new_segment:
                                                goto wait_for_sndbuf;
                                }
 
-                               skb = sdp_stream_alloc_skb(sk, select_size(sk, ssk),
-                                                          sk->sk_allocation);
+                               skb = sdp_stream_alloc_skb(sk, 0, sk->sk_allocation);
                                if (!skb)
                                        goto wait_for_memory;
 
-//                             sdp_prf(sk, skb, "Created");
-
                                BZCOPY_STATE(skb) = bz;
 
                                /*
@@ -1718,7 +1700,6 @@ new_segment:
                                skb_entail(sk, ssk, skb);
                                copy = size_goal;
                        } else {
-//                             sdp_prf(sk, skb, "adding %d bytes", copy);
                                sdp_dbg_data(sk, "adding to existing skb: %p"
                                        " len = %d, sk_send_head: %p copy: %d\n",
                                        skb, skb->len, sk->sk_send_head, copy);
@@ -1761,15 +1742,8 @@ new_segment:
                        if ((seglen -= copy) == 0 && iovlen == 0)
                                goto out;
 
-                       if (skb->len < mss_now || (flags & MSG_OOB))
+                       if (skb->len < PAGE_SIZE || (flags & MSG_OOB))
                                continue;
-
-                       if (forced_push(ssk)) {
-                               sdp_mark_push(ssk, skb);
-                               /* TODO: and push pending frames mss_now */
-                               /* sdp_push_pending(sk, ssk, mss_now, TCP_NAGLE_PUSH); */
-                       } else if (skb == sk->sk_send_head)
-                               sdp_push_one(sk, mss_now);
                        continue;
 
 wait_for_sndbuf:
@@ -1778,24 +1752,32 @@ wait_for_memory:
                        sdp_prf(sk, skb, "wait for mem");
                        SDPSTATS_COUNTER_INC(send_wait_for_mem);
                        if (copied)
-                               sdp_push(sk, ssk, flags & ~MSG_MORE, PAGE_SIZE, TCP_NAGLE_PUSH);
+                               sdp_push(sk, ssk, flags & ~MSG_MORE);
 
                        sdp_xmit_poll(ssk, 1);
 
-                       err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) :
-                                    sk_stream_wait_memory(sk, &timeo);
-                       sdp_prf(sk, skb, "finished wait for mem. err: %d", err);
+                       if (bz) {
+                               err = sdp_bzcopy_wait_memory(ssk, &timeo, bz);
+                       } else {
+                               posts_handler_put(ssk);
+
+                               err = sk_stream_wait_memory(sk, &timeo);
+
+                               sdp_prf1(sk, skb, "finished wait for mem. err: %d", err);
+                               posts_handler_get(ssk);
+                               sdp_do_posts(ssk);
+                       }
+
                        if (err)
                                goto do_error;
 
-                       mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
                        size_goal = ssk->xmit_size_goal;
                }
        }
 
 out:
        if (copied) {
-               sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
+               sdp_push(sk, ssk, flags);
 
                if (bz)
                        bz = sdp_bz_cleanup(bz);
@@ -1804,6 +1786,8 @@ out:
                                poll_send_cq(sk);
        }
 
+       posts_handler_put(ssk);
+
        release_sock(sk);
        return copied;
 
@@ -1824,6 +1808,9 @@ out_err:
        if (bz)
                bz = sdp_bz_cleanup(bz);
        err = sk_stream_error(sk, flags, err);
+
+       posts_handler_put(ssk);
+
        release_sock(sk);
        return err;
 }
@@ -1863,6 +1850,8 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        lock_sock(sk);
        sdp_dbg_data(sk, "%s\n", __func__);
 
+       posts_handler_get(ssk);
+
        sdp_prf(sk, skb, "Read from user");
 
        err = -ENOTCONN;
@@ -1900,7 +1889,10 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        if (!skb)
                                break;
 
-                       BUG_ON((skb_transport_header(skb))[0] != SDP_MID_DATA);
+                       if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN) {
+                               sdp_prf(sk, NULL, "Got DISCONN skb - killing it");
+                               goto found_fin_ok;
+                       }
 
                        if (before(*seq, TCP_SKB_CB(skb)->seq)) {
                                sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
@@ -1970,7 +1962,17 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        lock_sock(sk);
                } else if (rc) {
                        sdp_dbg_data(sk, "%s: sk_wait_data %ld\n", __func__, timeo);
+                       sdp_prf(sk, NULL, "waiting for data");
+
+                       posts_handler_put(ssk);
+
+                       /* socket lock is released inside sk_wait_data */
                        sk_wait_data(sk, &timeo);
+
+                       posts_handler_get(ssk);
+                       sdp_prf(sk, NULL, "got data");
+
+                       sdp_do_posts(ssk);
                }
                continue;
 
@@ -2022,7 +2024,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
                sdp_dbg_data(sk, "%s: done copied %d target %d\n", __func__, copied, target);
 
-               sdp_schedule_post_recvs(sdp_sk(sk));
+               sdp_do_posts(sdp_sk(sk));
 skip_copy:
                if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq))
                        ssk->urg_data = 0;
@@ -2034,10 +2036,22 @@ skip_copy:
                        skb_unlink(skb, &sk->sk_receive_queue);
                        __kfree_skb(skb);
                }
+               continue;
+found_fin_ok:
+               ++*seq;
+               if (!(flags & MSG_PEEK)) {
+                       skb_unlink(skb, &sk->sk_receive_queue);
+                       __kfree_skb(skb);
+               }
+               break;
+               
        } while (len > 0);
 
        err = copied;
 out:
+
+       posts_handler_put(ssk);
+
        release_sock(sk);
        return err;
 
@@ -2257,7 +2271,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
 
        sk->sk_destruct = sdp_destruct;
 
-       sdp_init_timer(sk);
+       sdp_init_keepalive_timer(sk);
 
        sock->ops = &sdp_proto_ops;
        sock->state = SS_UNCONNECTED;
index 7d5b33edc7038efc62c4743567d191acd98f86c4..7e17423a08b1660aaa8c96106cdf378aa1535114 100644 (file)
@@ -50,6 +50,14 @@ static int top_mem_usage = 0;
 module_param_named(top_mem_usage, top_mem_usage, int, 0644);
 MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB).");
 
+static int hw_int_mod_count = 10;
+module_param_named(hw_int_mod_count, hw_int_mod_count, int, 0644);
+MODULE_PARM_DESC(hw_int_mod_count, "HW interrupt moderation. int count");
+
+static int hw_int_mod_msec = 200;
+module_param_named(hw_int_mod_msec, hw_int_mod_msec, int, 0644);
+MODULE_PARM_DESC(hw_int_mod_count, "HW interrupt moderation. mseq");
+
 #ifdef CONFIG_PPC
 static int max_large_sockets = 100;
 #else
@@ -259,7 +267,6 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
        max_bytes = sk->sk_rcvbuf * scale;
 
        if  (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE)) {
-               sdp_prf(sk, NULL, "rx ring is full");
                return 0;
        }
 
@@ -280,38 +287,18 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
 
 static inline void sdp_post_recvs(struct sdp_sock *ssk)
 {
-       int rc = 0;
 again: 
-       ssk->posts_in_process = 1;
-
-       do {
-               if (!sdp_post_recvs_needed(ssk))
-                       break;
-
-               rc = sdp_post_recv(ssk);
-       } while (!rc);
+       while (sdp_post_recvs_needed(ssk)) {
+               if (sdp_post_recv(ssk)) 
+                       goto out;
+       }
 
        sk_stream_mem_reclaim(&ssk->isk.sk);
 
-       ssk->posts_in_process = 0;
        if (sdp_post_recvs_needed(ssk))
                goto again;
-}
-
-void sdp_schedule_post_recvs(struct sdp_sock *ssk)
-{
-       struct sock *sk = &ssk->isk.sk;
-
-       WARN_ON_LOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
-
-       if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
-               (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
-               sdp_prf(sk, NULL, "socket is closed - not posting");
-               return;
-       }
-
-       if (!ssk->posts_in_process && sdp_post_recvs_needed(ssk))
-               queue_work(rx_comp_wq, &ssk->rx_comp_work);
+out:
+       sk_stream_mem_reclaim(&ssk->isk.sk);
 }
 
 static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
@@ -416,8 +403,6 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
        struct sk_buff *skb;
        int i, frags;
 
-       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
-
        if (unlikely(id != ring_tail(ssk->rx_ring))) {
                printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
                        id, ring_tail(ssk->rx_ring));
@@ -442,7 +427,7 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
 /* socket lock should be taken before calling this */
 static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 {
-       struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
        struct sock *sk = &ssk->isk.sk;
 
        switch (h->mid) {
@@ -460,28 +445,33 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 
                        sk->sk_prot->disconnect(sk, 0);
                }
+               __kfree_skb(skb);
 
                break;
        case SDP_MID_DISCONN:
                sdp_dbg_data(sk, "Handling RX disconnect\n");
+               sdp_prf(sk, NULL, "Handling RX disconnect");
                sdp_fin(sk);
+               sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg");
+               /* Enqueue fin skb to release sleeping recvmsg */
+               sdp_sock_queue_rcv_skb(sk, skb);
                break;
        case SDP_MID_CHRCVBUF:
                sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
-               sdp_handle_resize_request(ssk,
-                       (struct sdp_chrecvbuf *)skb->data);
+               sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)h);
+               __kfree_skb(skb);
                break;
        case SDP_MID_CHRCVBUF_ACK:
                sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
-               sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data);
+               sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)h);
+               __kfree_skb(skb);
                break;
        default:
                /* TODO: Handle other messages */
                sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
+               __kfree_skb(skb);
        }
 
-       __kfree_skb(skb);
-
        return 0;
 }
 
@@ -494,7 +484,7 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
        unsigned long mseq_ack;
        int credits_before;
 
-       h = (struct sdp_bsdh *)skb->data;
+       h = (struct sdp_bsdh *)skb_transport_header(skb);
 
        SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
 
@@ -505,7 +495,7 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
        if (mseq_ack >= ssk->nagle_last_unacked)
                ssk->nagle_last_unacked = 0;
 
-       sdp_prf(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d",
+       sdp_prf1(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d",
                mid2str(h->mid), ntohs(h->bufs), credits_before, 
                tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
 
@@ -521,18 +511,25 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 /*     if (unlikely(h->flags & SDP_OOB_PEND))
                sk_send_sigurg(sk);*/
 
+       skb_pull(skb, sizeof(struct sdp_bsdh));
+       
        if (h->mid != SDP_MID_DATA || unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
-               sdp_warn(sk, "Control skb - queing to control queue\n");
+               sdp_prf(sk, NULL, "Control skb - queing to control queue");
                skb_queue_tail(&ssk->rx_ctl_q, skb);
                return 0;
        }
 
-       skb_pull(skb, sizeof(struct sdp_bsdh));
-       
        if (unlikely(skb->len <= 0)) {
                __kfree_skb(skb);
+               return 0;
        }
 
+       sdp_prf(sk, NULL, "queueing a %s skb", (h->mid == SDP_MID_DATA ? "data" : "disconnect"));
+       skb = sdp_sock_queue_rcv_skb(sk, skb);
+
+/*     if (unlikely(h->flags & SDP_OOB_PRES))
+               sdp_urg(ssk, skb);*/
+
        return 0;
 }
 
@@ -563,7 +560,7 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
        sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
                        (int)wc->wr_id, wc->byte_len);
        if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
-               printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
+               sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
                                wc->byte_len, sizeof(struct sdp_bsdh));
                __kfree_skb(skb);
                return NULL;
@@ -599,15 +596,14 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
        struct socket *sock = sk->sk_socket;
 
        if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
-               sdp_prf(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d",
+               sdp_prf1(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d",
                                tx_credits(ssk), ssk->min_bufs,
                                ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
        }
 
-       if (tx_credits(ssk) >= ssk->min_bufs &&
-           ring_head(ssk->tx_ring) == ring_tail(ssk->tx_ring) &&
-          sock != NULL) {
+       if (tx_credits(ssk) >= ssk->min_bufs && sock != NULL) {
                clear_bit(SOCK_NOSPACE, &sock->flags);
+               sdp_prf1(sk, NULL, "Waking up sleepers");
 
                if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
                        wake_up_interruptible(sk->sk_sleep);
@@ -616,7 +612,7 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
        }
 }
 
-/* only from interrupt.
+/* only from interrupt. */
 static int sdp_poll_rx_cq(struct sdp_sock *ssk)
 {
        struct ib_cq *cq = ssk->rx_ring.cq;
@@ -625,8 +621,6 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk)
        int wc_processed = 0;
        struct sk_buff *skb;
 
-       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
-
        do {
                n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
                for (i = 0; i < n; ++i) {
@@ -652,39 +646,59 @@ void sdp_rx_comp_work(struct work_struct *work)
 {
        struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
        struct sock *sk = &ssk->isk.sk;
-       int xmit_poll_force;
-       struct sk_buff *skb;
 
        sdp_prf(sk, NULL, "%s", __func__);
 
-       if (unlikely(!ssk->rx_ring.cq))
+       if (unlikely(!ssk->qp)) {
+               sdp_prf(sk, NULL, "qp was destroyed");
+               return;
+       }
+       if (unlikely(!ssk->rx_ring.cq)) {
+               sdp_prf(sk, NULL, "rx_ring.cq is NULL");
                return;
+       }
 
        if (unlikely(!ssk->poll_cq)) {
                struct rdma_cm_id *id = ssk->id;
                sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
+               sdp_prf(sk, NULL, "poll cq is 0");
                if (id && id->qp)
                        rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
                return;
        }
 
-       sdp_post_recvs(ssk);
+       lock_sock(sk);
 
-       if (!(sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS)) &&
-                       !credit_update_needed(ssk) &&
-                       skb_queue_empty(&ssk->rx_ctl_q)) {
-               sdp_prf(&ssk->isk.sk, NULL, "only post recvs");
-               return;
-       }
+       sdp_do_posts(ssk);
 
-       lock_sock(sk);
+       release_sock(sk);
+}
 
-       sdp_post_sends(ssk, 0);
+void sdp_do_posts(struct sdp_sock *ssk)
+{
+       struct sock *sk = &ssk->isk.sk;
+       int xmit_poll_force;
+       struct sk_buff *skb;
 
        while ((skb = skb_dequeue(&ssk->rx_ctl_q))) {
                sdp_process_rx_ctl_skb(ssk, skb);
        }
 
+       if (sk->sk_state == TCP_TIME_WAIT) {
+               sdp_prf(sk, NULL, "in TIMEWAIT. qp=%p cq=%p", ssk->qp, ssk->rx_ring.cq);
+               return;
+       }
+
+       if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
+               return;
+
+       sdp_post_recvs(ssk);
+
+       if (ring_posted(ssk->tx_ring))
+               sdp_xmit_poll(ssk, 1);
+
+       sdp_post_sends(ssk, 0);
+
        sk_stream_mem_reclaim(sk);
 
        xmit_poll_force = sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS);
@@ -692,12 +706,11 @@ void sdp_rx_comp_work(struct work_struct *work)
        if (credit_update_needed(ssk) || xmit_poll_force) {
                /* if has pending tx because run out of tx_credits - xmit it */
                sdp_prf(sk, NULL, "Processing to free pending sends");
-               sdp_xmit_poll(ssk,  xmit_poll_force); 
-               sdp_prf(sk, NULL, "Sending credit update if needed");
+               sdp_xmit_poll(ssk,  xmit_poll_force);
+               sdp_prf(sk, NULL, "Sending credit update");
                sdp_post_sends(ssk, 0);
        }
 
-       release_sock(sk);
 }
 
 static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
@@ -710,7 +723,10 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 
        sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
 
-       WARN_ON(cq != ssk->rx_ring.cq);
+       if (cq != ssk->rx_ring.cq) {
+               sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
+               return;
+       }
 
        SDPSTATS_COUNTER_INC(rx_int_count);
 
@@ -736,7 +752,19 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
                sdp_prf(&ssk->isk.sk, NULL, "credits:  %d -> %d",
                                credits_before, tx_credits(ssk));
 
-               queue_work(rx_comp_wq, &ssk->rx_comp_work);
+               if (posts_handler(ssk) || 
+                       (!skb_queue_empty(&ssk->rx_ctl_q) &&
+                       (sk->sk_socket && (sk->sk_socket->flags & SOCK_ASYNC_WAITDATA)))) {
+
+                       sdp_prf(&ssk->isk.sk, NULL, "Somebody is doing the post work for me. %d",
+                               posts_handler(ssk));
+
+               } else {
+                       sdp_prf(&ssk->isk.sk, NULL, "Queuing work. others: %d, ctl_q: %d",
+                                       posts_handler(ssk),
+                                       !skb_queue_empty(&ssk->rx_ctl_q));
+                       queue_work(rx_comp_wq, &ssk->rx_comp_work);
+               }
        }
        sdp_arm_rx_cq(sk);
 
@@ -746,8 +774,6 @@ out:
 
 static void sdp_rx_ring_purge(struct sdp_sock *ssk)
 {
-       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
-
        while (ring_posted(ssk->rx_ring) > 0) {
                struct sk_buff *skb;
                skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
@@ -798,7 +824,7 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
                goto err_cq;
        }
 
-       rc = ib_modify_cq(rx_cq, 10, 200);
+       rc = ib_modify_cq(rx_cq, hw_int_mod_count, hw_int_mod_msec);
        if (rc) {
                sdp_warn(&ssk->isk.sk, "Unable to modify RX CQ: %d.\n", rc);
                goto err_mod;
@@ -824,8 +850,6 @@ out:
 
 void sdp_rx_ring_destroy(struct sdp_sock *ssk)
 {
-       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
-
        if (ssk->rx_ring.buffer) {
                sdp_rx_ring_purge(ssk);
 
index 95e3c82320cca8635a54dde13d7796d1e5c630dd..594bfc36f299d12ec9ab8d0d76e33e7e2a934c96 100644 (file)
@@ -92,7 +92,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        h->mseq = htonl(mseq);
        h->mseq_ack = htonl(mseq_ack(ssk));
 
-       sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
+       sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
                        mid2str(mid), ring_posted(ssk->rx_ring), mseq, ntohl(h->mseq_ack));
 
        SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
@@ -214,7 +214,7 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 
        {
                struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
-               sdp_prf(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
+               sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
        }
 
        sk_wmem_free_skb(&ssk->isk.sk, skb);
@@ -272,7 +272,6 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk)
 
                if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
                        sk_stream_write_space(&ssk->isk.sk);
-
        }
 
        return wc_processed;