]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: TX from 1 context only. RX with minimal context switches
authorAmir Vadai <amirv@mellanox.co.il>
Tue, 26 May 2009 16:16:56 +0000 (19:16 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:30 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_proc.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c

index f9b295ee98f3c962b298ccd1672c5ff9798cbe97..25e18442b2de5d6555eab7ef1664a1bd592760cd 100644 (file)
@@ -6,15 +6,22 @@
 #include <net/inet_sock.h>
 #include <net/tcp.h> /* For urgent data flags */
 #include <rdma/ib_verbs.h>
+#include <linux/sched.h>
+
+#undef SDPSTATS_ON
+#undef SDP_PROFILING
+#undef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+#undef CONFIG_INFINIBAND_SDP_DEBUG
 
-#undef SDP_LOCKS_CHECK
 #define SDPSTATS_ON
 #define SDP_PROFILING
+#define CONFIG_INFINIBAND_SDP_DEBUG_DATA
+#define CONFIG_INFINIBAND_SDP_DEBUG
 
 #define _sdp_printk(func, line, level, sk, format, arg...)                \
-       printk(level "%s:%d sdp_sock(%5d %d:%d): " format,             \
+       printk(level "%s:%d sdp_sock(%5d:%d %d:%d): " format,             \
               func, line, \
-              current->pid, \
+              current->pid, smp_processor_id(), \
               (sk) ? inet_sk(sk)->num : -1,                 \
               (sk) ? ntohs(inet_sk(sk)->dport) : -1, ## arg)
 #define sdp_printk(level, sk, format, arg...)                \
        sdp_printk(KERN_WARNING, sk, format , ## arg)
 
 
-#ifdef SDP_LOCKS_CHECK
-#define WARN_ON_UNLOCKED(sk, l) do {\
-       if (unlikely(!spin_is_locked(l))) { \
-               sdp_warn(sk, "lock " #l " should be locked\n"); \
-               WARN_ON(1); \
-       } \
-} while (0)
-
-#define WARN_ON_LOCKED(sk, l) do {\
-       if (unlikely(spin_is_locked(l))) { \
-               sdp_warn(sk, "lock " #l " should be unlocked\n"); \
-               WARN_ON(1); \
-       } \
-} while (0)
-#else
-#define WARN_ON_UNLOCKED(sk, l)
-#define WARN_ON_LOCKED(sk, l)
-#endif
-
 #define rx_ring_lock(ssk, f) do { \
        spin_lock_irqsave(&ssk->rx_ring.lock, f); \
 } while (0)
@@ -55,6 +43,7 @@ struct sk_buff;
 struct sdpprf_log {
        int             idx;
        int             pid;
+       int             cpu;
        int             sk_num;
        int             sk_dport;
        struct sk_buff  *skb;
@@ -83,6 +72,7 @@ static inline unsigned long long current_nsec(void)
        l->pid = current->pid; \
        l->sk_num = (sk) ? inet_sk(sk)->num : -1;                 \
        l->sk_dport = (sk) ? ntohs(inet_sk(sk)->dport) : -1; \
+       l->cpu = smp_processor_id(); \
        l->skb = s; \
        snprintf(l->msg, sizeof(l->msg) - 1, format, ## arg); \
        l->time = current_nsec(); \
@@ -360,7 +350,7 @@ struct sdp_sock {
        struct list_head sock_list;
        struct list_head accept_queue;
        struct list_head backlog_queue;
-       struct sk_buff_head rx_backlog;
+       struct sk_buff_head rx_ctl_q;
        struct sock *parent;
 
        struct work_struct rx_comp_work;
@@ -373,7 +363,8 @@ struct sdp_sock {
        u16 urg_data;
        u32 urg_seq;
        u32 copied_seq;
-       u32 rcv_nxt;
+#define rcv_nxt(ssk) atomic_read(&(ssk->rcv_nxt))
+       atomic_t rcv_nxt;
 
        int write_seq;
        int pushed_seq;
@@ -561,10 +552,10 @@ void sdp_nagle_timeout(unsigned long data);
 void sdp_rx_ring_init(struct sdp_sock *ssk);
 int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
 void sdp_rx_ring_destroy(struct sdp_sock *ssk);
-int sdp_process_rx_q(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_post_recvs(struct sdp_sock *ssk);
+void sdp_schedule_post_recvs(struct sdp_sock *ssk);
+void sdp_rx_comp_full(struct sdp_sock *ssk);
 
 static inline void sdp_arm_rx_cq(struct sock *sk)
 {
index 6e0a038d085211ecdbfc213c96a21b6abf818c82..8f6f0e29605c3ef00d404354a12821d89b84e160 100644 (file)
@@ -157,7 +157,8 @@ void sdp_nagle_timeout(unsigned long data)
 out:
        bh_unlock_sock(sk);
 out2:
-       mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
+       if (sk->sk_send_head) /* If has pending sends - rearm */
+               mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
 }
 
 int sdp_post_credits(struct sdp_sock *ssk)
index a9dcf777509c440ea380e48a90b6270e97d646cd..5726fe45a96e4b63b671dd83d96e80af109b4e1f 100644 (file)
@@ -288,15 +288,13 @@ static int sdp_disconnected_handler(struct sock *sk)
 
        sdp_dbg(sk, "%s\n", __func__);
 
-       sdp_process_rx_q(ssk);
-
        if (ssk->tx_ring.cq)
                sdp_xmit_poll(ssk, 1);
 
        if (sk->sk_state == TCP_SYN_RECV) {
                sdp_connected_handler(sk, NULL);
 
-               if (ssk->rcv_nxt)
+               if (rcv_nxt(ssk))
                        return 0;
        }
 
index c6b17dbcc6d895b1e44daaa3253d9676ef18bae4..432b198939bf82d5a7466e9c9422de0a83fea169 100644 (file)
@@ -325,8 +325,6 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
        read_lock(&device_removal_lock);
 
-       sdp_process_rx_q(ssk);
-
        if (ssk->tx_ring.cq)
                sdp_xmit_poll(ssk, 1);
 
@@ -505,7 +503,7 @@ static void sdp_close(struct sock *sk, long timeout)
         *  descriptor close, not protocol-sourced closes, because the
         *  reader process may not have drained the data yet!
         */
-       while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+       while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
                data_was_unread = 1;
                __kfree_skb(skb);
        }
@@ -696,13 +694,9 @@ static int sdp_wait_for_connect(struct sock *sk, long timeo)
                                          TASK_INTERRUPTIBLE);
                release_sock(sk);
                if (list_empty(&ssk->accept_queue)) {
-                       sdp_dbg(sk, "%s schedule_timeout\n", __func__);
                        timeo = schedule_timeout(timeo);
-                       sdp_dbg(sk, "%s schedule_timeout done\n", __func__);
                }
-               sdp_dbg(sk, "%s lock_sock\n", __func__);
                lock_sock(sk);
-               sdp_dbg(sk, "%s lock_sock done\n", __func__);
                err = 0;
                if (!list_empty(&ssk->accept_queue))
                        break;
@@ -800,8 +794,8 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
                else if (sock_flag(sk, SOCK_URGINLINE) ||
                         !ssk->urg_data ||
                         before(ssk->urg_seq, ssk->copied_seq) ||
-                        !before(ssk->urg_seq, ssk->rcv_nxt)) {
-                       answ = ssk->rcv_nxt - ssk->copied_seq;
+                        !before(ssk->urg_seq, rcv_nxt(ssk))) {
+                       answ = rcv_nxt(ssk) - ssk->copied_seq;
 
                        /* Subtract 1, if FIN is in queue. */
                        if (answ && !skb_queue_empty(&sk->sk_receive_queue))
@@ -925,7 +919,7 @@ int sdp_init_sock(struct sock *sk)
 
        sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
 
-       skb_queue_head_init(&ssk->rx_backlog);
+       skb_queue_head_init(&ssk->rx_ctl_q);
 
        atomic_set(&ssk->mseq_ack, 0);
 
@@ -1138,7 +1132,7 @@ static inline int poll_recv_cq(struct sock *sk)
 {
        int i;
        for (i = 0; i < recv_poll; ++i) {
-               if (!sdp_process_rx_q(sdp_sk(sk))) {
+               if (!skb_queue_empty(&sk->sk_receive_queue)) {
                        ++recv_poll_hit;
                        return 0;
                }
@@ -1214,12 +1208,6 @@ static int sdp_recv_urg(struct sock *sk, long timeo,
        return -EAGAIN;
 }
 
-static void sdp_rcv_space_adjust(struct sock *sk)
-{
-       sdp_post_recvs(sdp_sk(sk));
-       sdp_post_sends(sdp_sk(sk), 0);
-}
-
 static unsigned int sdp_current_mss(struct sock *sk, int large_allowed)
 {
        /* TODO */
@@ -1875,7 +1863,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        lock_sock(sk);
        sdp_dbg_data(sk, "%s\n", __func__);
 
-//     sdp_prf(sk, skb, "Read from user");
+       sdp_prf(sk, skb, "Read from user");
 
        err = -ENOTCONN;
        if (sk->sk_state == TCP_LISTEN)
@@ -1912,12 +1900,12 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        if (!skb)
                                break;
 
-                       if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN)
-                               goto found_fin_ok;
+                       BUG_ON((skb_transport_header(skb))[0] != SDP_MID_DATA);
 
                        if (before(*seq, TCP_SKB_CB(skb)->seq)) {
-                               printk(KERN_INFO "recvmsg bug: copied %X "
-                                      "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
+                               sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
+                                       *seq, TCP_SKB_CB(skb)->seq);
+                               sdp_reset(sk);
                                break;
                        }
 
@@ -2034,7 +2022,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
 
                sdp_dbg_data(sk, "%s: done copied %d target %d\n", __func__, copied, target);
 
-               sdp_rcv_space_adjust(sk);
+               sdp_schedule_post_recvs(sdp_sk(sk));
 skip_copy:
                if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq))
                        ssk->urg_data = 0;
@@ -2042,21 +2030,13 @@ skip_copy:
                        continue;
                offset = 0;
 
-               if (!(flags & MSG_PEEK))
-                       sk_eat_skb(sk, skb, 0);
-
-               continue;
-found_fin_ok:
-               ++*seq;
-               if (!(flags & MSG_PEEK))
-                       sk_eat_skb(sk, skb, 0);
-
-               break;
+               if (!(flags & MSG_PEEK)) {
+                       skb_unlink(skb, &sk->sk_receive_queue);
+                       __kfree_skb(skb);
+               }
        } while (len > 0);
 
-       release_sock(sk);
-       return copied;
-
+       err = copied;
 out:
        release_sock(sk);
        return err;
index ec9785499f18789845bfdae6c9fe0f83c6153440..a4a1a4d5a7dde4873f831370e1301bda9a03b446 100644 (file)
@@ -39,9 +39,6 @@
 #define PROC_SDP_STATS "sdpstats"
 #define PROC_SDP_PERF "sdpprf"
 
-struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
-int sdpprf_log_count = 0;
-
 /* just like TCP fs */
 struct sdp_seq_afinfo {
        struct module           *owner;
@@ -57,9 +54,6 @@ struct sdp_iter_state {
        struct seq_operations   seq_ops;
 };
 
-//struct sdpprf sdpprf = { { 0 } };
-
-
 static void *sdp_get_idx(struct seq_file *seq, loff_t pos)
 {
        int i = 0;
@@ -148,7 +142,7 @@ static int sdp_seq_show(struct seq_file *seq, void *v)
        srcp = ntohs(inet_sk(sk)->sport);
        uid = sock_i_uid(sk);
        inode = sock_i_ino(sk);
-       rx_queue = sdp_sk(sk)->rcv_nxt - sdp_sk(sk)->copied_seq;
+       rx_queue = rcv_nxt(sdp_sk(sk)) - sdp_sk(sk)->copied_seq;
        tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->tx_ring.una_seq;
 
        sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu       %08X:%08X %X",
@@ -273,6 +267,7 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
 
        seq_printf(seq, "rx_poll_miss      \t\t: %d\n", sdpstats.rx_poll_miss);
        seq_printf(seq, "tx_poll_miss      \t\t: %d\n", sdpstats.tx_poll_miss);
+       seq_printf(seq, "tx_poll_busy      \t\t: %d\n", sdpstats.tx_poll_busy);
        seq_printf(seq, "tx_poll_hit       \t\t: %d\n", sdpstats.tx_poll_hit);
 
        seq_printf(seq, "CQ stats:\n");
@@ -306,6 +301,10 @@ static struct file_operations sdpstats_fops = {
 
 #endif
 
+#ifdef SDP_PROFILING
+struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
+int sdpprf_log_count = 0;
+
 unsigned long long start_t = 0;
 
 static int sdpprf_show(struct seq_file *m, void *v)
@@ -321,9 +320,9 @@ static int sdpprf_show(struct seq_file *m, void *v)
        t = l->time - start_t;
        nsec_rem = do_div(t, 1000000000);
 
-       seq_printf(m, "%-6d: [%5lu.%06lu] %-50s - [%d %d:%d] skb: %p %s:%d\n",
+       seq_printf(m, "%-6d: [%5lu.%06lu] %-50s - [%d{%d} %d:%d] skb: %p %s:%d\n",
                        l->idx, (unsigned long)t, nsec_rem/1000,
-                       l->msg, l->pid, l->sk_num, l->sk_dport,
+                       l->msg, l->pid, l->cpu, l->sk_num, l->sk_dport,
                        l->skb, l->func, l->line);
 out:
         return 0;
@@ -408,6 +407,7 @@ static struct file_operations sdpprf_fops = {
         .release        = seq_release,
        .write          = sdpprf_write,
 };
+#endif /* SDP_PROFILING */
 
 int __init sdp_proc_init(void)
 {
@@ -437,10 +437,12 @@ int __init sdp_proc_init(void)
 
 #endif
 
+#ifdef SDP_PROFILING
        sdpprf = proc_net_fops_create(&init_net, PROC_SDP_PERF,
                        S_IRUGO | S_IWUGO, &sdpprf_fops); 
        if (!sdpprf)
                goto no_mem;
+#endif
 
        return 0;
 no_mem:
@@ -464,7 +466,9 @@ void sdp_proc_unregister(void)
 #ifdef SDPSTATS_ON
        proc_net_remove(&init_net, PROC_SDP_STATS);
 #endif
+#ifdef SDP_PROFILING
        proc_net_remove(&init_net, PROC_SDP_PERF);
+#endif
 }
 
 #else /* CONFIG_PROC_FS */
index 05d86901a9e5a99e6916ce63888b3fa4acd2c4de..7d5b33edc7038efc62c4743567d191acd98f86c4 100644 (file)
@@ -149,9 +149,6 @@ static void sdp_fin(struct sock *sk)
        }
 }
 
-/* lock_sock must be taken before calling this - since rx_ring.head is not 
- * protected (although being atomic
- */
 static int sdp_post_recv(struct sdp_sock *ssk)
 {
        struct sdp_buf *rx_req;
@@ -170,7 +167,6 @@ static int sdp_post_recv(struct sdp_sock *ssk)
        gfp_t gfp_page;
        int ret = 0;
 
-       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
        /* Now, allocate and repost recv */
        /* TODO: allocate from cache */
 
@@ -184,6 +180,7 @@ static int sdp_post_recv(struct sdp_sock *ssk)
                gfp_page = GFP_HIGHUSER;
        }
 
+       sdp_prf(&ssk->isk.sk, skb, "Posting skb");
        /* FIXME */
        BUG_ON(!skb);
        h = (struct sdp_bsdh *)skb->head;
@@ -231,53 +228,90 @@ static int sdp_post_recv(struct sdp_sock *ssk)
        rx_wr.sg_list = ibsge;
        rx_wr.num_sge = frags + 1;
        rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
-       SDPSTATS_COUNTER_INC(post_recv);
        atomic_inc(&ssk->rx_ring.head);
        if (unlikely(rc)) {
                sdp_warn(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
+
+               lock_sock(&ssk->isk.sk);
                sdp_reset(&ssk->isk.sk);
+               release_sock(&ssk->isk.sk);
+
                ret = -1;
        }
 
+       SDPSTATS_COUNTER_INC(post_recv);
        atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
 
        return ret;
 }
 
-/* lock_sock must be taken before calling this */
-static void _sdp_post_recvs(struct sdp_sock *ssk)
+static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
 {
        struct sock *sk = &ssk->isk.sk;
        int scale = ssk->rcvbuf_scale;
-
-       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
-
-       if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
-               (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
-               return;
-       }
+       int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
+       unsigned long max_bytes;
 
        if (top_mem_usage &&
            (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
                scale = 1;
 
-       while ((likely(ring_posted(ssk->rx_ring) < SDP_RX_SIZE) &&
-               (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
-               (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
-               ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
-              unlikely(ring_posted(ssk->rx_ring) < SDP_MIN_TX_CREDITS)) {
-               if (sdp_post_recv(ssk))
-                       break;
+       max_bytes = sk->sk_rcvbuf * scale;
+
+       if  (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE)) {
+               sdp_prf(sk, NULL, "rx ring is full");
+               return 0;
        }
+
+       if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) {
+               unsigned long bytes_in_process = 
+                       (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) * buffer_size;
+               bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq;
+
+               if (bytes_in_process >= max_bytes) {
+                       sdp_prf(sk, NULL, "bytes_in_process:%ld > max_bytes:%ld",
+                                       bytes_in_process, max_bytes);
+                       return 0;
+               }
+       }
+
+       return 1;
 }
 
-void sdp_post_recvs(struct sdp_sock *ssk)
+static inline void sdp_post_recvs(struct sdp_sock *ssk)
 {
-       unsigned long flags;
+       int rc = 0;
+again: 
+       ssk->posts_in_process = 1;
 
-       rx_ring_lock(ssk, flags);
-       _sdp_post_recvs(ssk);
-       rx_ring_unlock(ssk, flags);
+       do {
+               if (!sdp_post_recvs_needed(ssk))
+                       break;
+
+               rc = sdp_post_recv(ssk);
+       } while (!rc);
+
+       sk_stream_mem_reclaim(&ssk->isk.sk);
+
+       ssk->posts_in_process = 0;
+       if (sdp_post_recvs_needed(ssk))
+               goto again;
+}
+
+void sdp_schedule_post_recvs(struct sdp_sock *ssk)
+{
+       struct sock *sk = &ssk->isk.sk;
+
+       WARN_ON_LOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+       if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
+               (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
+               sdp_prf(sk, NULL, "socket is closed - not posting");
+               return;
+       }
+
+       if (!ssk->posts_in_process && sdp_post_recvs_needed(ssk))
+               queue_work(rx_comp_wq, &ssk->rx_comp_work);
 }
 
 static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
@@ -285,7 +319,6 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
 {
        int skb_len;
        struct sdp_sock *ssk = sdp_sk(sk);
-       struct sk_buff *tail = NULL;
 
        /* not needed since sk_rmem_alloc is not currently used
         * TODO - remove this?
@@ -293,16 +326,10 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
 
        skb_len = skb->len;
 
-       TCP_SKB_CB(skb)->seq = ssk->rcv_nxt;
-       ssk->rcv_nxt += skb_len;
+       TCP_SKB_CB(skb)->seq = rcv_nxt(ssk);
+       atomic_add(skb_len, &ssk->rcv_nxt);
 
-       if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) &&
-           unlikely(skb_tailroom(tail) >= skb_len)) {
-               skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len);
-               __kfree_skb(skb);
-               skb = tail;
-       } else
-               skb_queue_tail(&sk->sk_receive_queue, skb);
+       skb_queue_tail(&sk->sk_receive_queue, skb);
 
        if (!sock_flag(sk, SOCK_DEAD))
                sk->sk_data_ready(sk, skb_len);
@@ -412,71 +439,97 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
        return skb;
 }
 
-/* this must be called while sock_lock is taken */
-static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
+/* socket lock should be taken before calling this */
+static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 {
+       struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
        struct sock *sk = &ssk->isk.sk;
-       int frags;
-       struct sdp_bsdh *h;
-       int pagesz, i;
-
-       h = (struct sdp_bsdh *)skb->data;
-
-       frags = skb_shinfo(skb)->nr_frags;
-       pagesz = PAGE_ALIGN(skb->data_len);
-       skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
-
-       for (i = skb_shinfo(skb)->nr_frags;
-                       i < frags; ++i) {
-               put_page(skb_shinfo(skb)->frags[i].page);
-               skb->truesize -= PAGE_SIZE;
-       }
-
-       if (unlikely(h->flags & SDP_OOB_PEND))
-               sk_send_sigurg(sk);
-
-       skb_pull(skb, sizeof(struct sdp_bsdh));
 
        switch (h->mid) {
        case SDP_MID_DATA:
-               if (unlikely(skb->len <= 0)) {
-                       __kfree_skb(skb);
-                       break;
-               }
+               WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN));
 
-               if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
-                       /* got data in RCV_SHUTDOWN */
-                       if (sk->sk_state == TCP_FIN_WAIT1) {
-                               /* go into abortive close */
-                               sdp_exch_state(sk, TCPF_FIN_WAIT1,
-                                              TCP_TIME_WAIT);
+               sdp_warn(sk, "DATA after socket rcv was shutdown\n");
 
-                               sk->sk_prot->disconnect(sk, 0);
-                       }
+               /* got data in RCV_SHUTDOWN */
+               if (sk->sk_state == TCP_FIN_WAIT1) {
+                       sdp_warn(sk, "socket in shutdown, state = FIN_WAIT1 and got data\n");
+                       /* go into abortive close */
+                       sdp_exch_state(sk, TCPF_FIN_WAIT1,
+                                       TCP_TIME_WAIT);
 
-                       __kfree_skb(skb);
-                       break;
+                       sk->sk_prot->disconnect(sk, 0);
                }
-               skb = sdp_sock_queue_rcv_skb(sk, skb);
-               if (unlikely(h->flags & SDP_OOB_PRES))
-                       sdp_urg(ssk, skb);
+
                break;
        case SDP_MID_DISCONN:
-               __kfree_skb(skb);
+               sdp_dbg_data(sk, "Handling RX disconnect\n");
                sdp_fin(sk);
                break;
        case SDP_MID_CHRCVBUF:
+               sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
                sdp_handle_resize_request(ssk,
                        (struct sdp_chrecvbuf *)skb->data);
-               __kfree_skb(skb);
                break;
        case SDP_MID_CHRCVBUF_ACK:
+               sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
                sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data);
-               __kfree_skb(skb);
                break;
        default:
                /* TODO: Handle other messages */
-               printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid);
+               sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
+       }
+
+       __kfree_skb(skb);
+
+       return 0;
+}
+
+static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
+{
+       struct sock *sk = &ssk->isk.sk;
+       int frags;
+       struct sdp_bsdh *h;
+       int pagesz, i;
+       unsigned long mseq_ack;
+       int credits_before;
+
+       h = (struct sdp_bsdh *)skb->data;
+
+       SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
+
+       mseq_ack = ntohl(h->mseq_ack);
+       credits_before = tx_credits(ssk);
+       atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 1 +
+               ntohs(h->bufs));
+       if (mseq_ack >= ssk->nagle_last_unacked)
+               ssk->nagle_last_unacked = 0;
+
+       sdp_prf(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d",
+               mid2str(h->mid), ntohs(h->bufs), credits_before, 
+               tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+
+       frags = skb_shinfo(skb)->nr_frags;
+       pagesz = PAGE_ALIGN(skb->data_len);
+       skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
+
+       for (i = skb_shinfo(skb)->nr_frags; i < frags; ++i) {
+               put_page(skb_shinfo(skb)->frags[i].page);
+               skb->truesize -= PAGE_SIZE;
+       }
+
+/*     if (unlikely(h->flags & SDP_OOB_PEND))
+               sk_send_sigurg(sk);*/
+
+       if (h->mid != SDP_MID_DATA || unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
+               sdp_warn(sk, "Control skb - queing to control queue\n");
+               skb_queue_tail(&ssk->rx_ctl_q, skb);
+               return 0;
+       }
+
+       skb_pull(skb, sizeof(struct sdp_bsdh));
+       
+       if (unlikely(skb->len <= 0)) {
                __kfree_skb(skb);
        }
 
@@ -489,8 +542,7 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
        struct sk_buff *skb;
        struct sdp_bsdh *h;
        struct sock *sk = &ssk->isk.sk;
-       int credits_before;
-       unsigned long mseq_ack;
+       int mseq;
        
        skb = sdp_recv_completion(ssk, wc->wr_id);
        if (unlikely(!skb))
@@ -530,23 +582,12 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
        h = (struct sdp_bsdh *)skb->data;
        SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
        skb_reset_transport_header(skb);
-       atomic_set(&ssk->mseq_ack, ntohl(h->mseq));
-       if (mseq_ack(ssk) != (int)wc->wr_id)
-               printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
-                               mseq_ack(ssk), (int)wc->wr_id);
 
-       SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
-
-       mseq_ack = ntohl(h->mseq_ack);
-       credits_before = tx_credits(ssk);
-       atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 1 +
-               ntohs(h->bufs));
-       if (mseq_ack >= ssk->nagle_last_unacked)
-               ssk->nagle_last_unacked = 0;
-
-       sdp_prf(&ssk->isk.sk, skb, "RX %s bufs=%d c before:%d after:%d "
-               "mseq:%d, ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before, 
-               tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+       mseq = ntohl(h->mseq);
+       atomic_set(&ssk->mseq_ack, mseq);
+       if (mseq != (int)wc->wr_id)
+               sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
+                               mseq, (int)wc->wr_id);
 
        return skb;
 }
@@ -576,7 +617,6 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
 }
 
 /* only from interrupt.
- * drain rx cq into rx_backlog queue */
 static int sdp_poll_rx_cq(struct sdp_sock *ssk)
 {
        struct ib_cq *cq = ssk->rx_ring.cq;
@@ -596,7 +636,8 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk)
                        skb = sdp_process_rx_wc(ssk, wc);
                        if (!skb)
                                continue;
-                       skb_queue_tail(&ssk->rx_backlog, skb);
+
+                       sdp_process_rx_skb(ssk, skb);
                        wc_processed++;
                }
        } while (n == SDP_NUM_WC);
@@ -607,69 +648,55 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk)
        return wc_processed;
 }
 
-int sdp_process_rx_q(struct sdp_sock *ssk)
+void sdp_rx_comp_work(struct work_struct *work)
 {
-       struct sk_buff *skb;
+       struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
        struct sock *sk = &ssk->isk.sk;
-       unsigned long flags;
+       int xmit_poll_force;
+       struct sk_buff *skb;
 
-       if (!ssk->rx_backlog.next || !ssk->rx_backlog.prev) {
-               sdp_warn(&ssk->isk.sk, "polling a zeroed rx_backlog!!!! %p\n", &ssk->rx_backlog);
-               return 0;
-       }
+       sdp_prf(sk, NULL, "%s", __func__);
+
+       if (unlikely(!ssk->rx_ring.cq))
+               return;
 
-       if (skb_queue_empty(&ssk->rx_backlog)) {
-               SDPSTATS_COUNTER_INC(rx_poll_miss);
-               return -EAGAIN;
+       if (unlikely(!ssk->poll_cq)) {
+               struct rdma_cm_id *id = ssk->id;
+               sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
+               if (id && id->qp)
+                       rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
+               return;
        }
 
-       /* update credits */
-       sdp_post_sends(ssk, 0);
+       sdp_post_recvs(ssk);
 
-       spin_lock_irqsave(&ssk->rx_backlog.lock, flags);
-       while ((skb = __skb_dequeue(&ssk->rx_backlog))) {
-               sdp_process_rx_skb(ssk, skb);
+       if (!(sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS)) &&
+                       !credit_update_needed(ssk) &&
+                       skb_queue_empty(&ssk->rx_ctl_q)) {
+               sdp_prf(&ssk->isk.sk, NULL, "only post recvs");
+               return;
        }
-       spin_unlock_irqrestore(&ssk->rx_backlog.lock, flags);
 
-       if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-               sk_stream_write_space(&ssk->isk.sk);
+       lock_sock(sk);
 
-       return 0;
-}
+       sdp_post_sends(ssk, 0);
 
-static void sdp_rx_comp_work(struct work_struct *work)
-{
-       struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
-       struct sock *sk = &ssk->isk.sk;
-       struct ib_cq *rx_cq;
+       while ((skb = skb_dequeue(&ssk->rx_ctl_q))) {
+               sdp_process_rx_ctl_skb(ssk, skb);
+       }
 
-       lock_sock(sk);
-       rx_cq = ssk->rx_ring.cq;
-       if (unlikely(!rx_cq))
-               goto out;
+       sk_stream_mem_reclaim(sk);
 
-       if (unlikely(!ssk->poll_cq)) {
-               struct rdma_cm_id *id = ssk->id;
-               sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
-               if (id && id->qp)
-                       rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
-               goto out;
-       }
+       xmit_poll_force = sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS);
 
-       sdp_process_rx_q(ssk);
-       sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
-       release_sock(sk);
-       sk_mem_reclaim(sk);
-       lock_sock(sk);
-       rx_cq = ssk->rx_ring.cq;
-       if (unlikely(!rx_cq))
-               goto out;
-       
-       sdp_process_rx_q(ssk);
-       sdp_xmit_poll(ssk,  1);
+       if (credit_update_needed(ssk) || xmit_poll_force) {
+               /* if has pending tx because run out of tx_credits - xmit it */
+               sdp_prf(sk, NULL, "Processing to free pending sends");
+               sdp_xmit_poll(ssk,  xmit_poll_force); 
+               sdp_prf(sk, NULL, "Sending credit update if needed");
+               sdp_post_sends(ssk, 0);
+       }
 
-out:
        release_sock(sk);
 }
 
@@ -679,6 +706,7 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
        struct sdp_sock *ssk = sdp_sk(sk);
        unsigned long flags;
        int wc_processed = 0;
+       int credits_before;
 
        sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
 
@@ -690,6 +718,8 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 
        rx_ring_lock(ssk, flags);
 
+       credits_before = tx_credits(ssk);
+
        if (unlikely(!ssk->poll_cq))
                sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
 
@@ -703,15 +733,11 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
        sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
 
        if (wc_processed) {
-               _sdp_post_recvs(ssk);
+               sdp_prf(&ssk->isk.sk, NULL, "credits:  %d -> %d",
+                               credits_before, tx_credits(ssk));
 
-               /* Best was to send credit update from here */
-/*             sdp_post_credits(ssk); */
-
-               /* issue sdp_rx_comp_work() */
                queue_work(rx_comp_wq, &ssk->rx_comp_work);
        }
-
        sdp_arm_rx_cq(sk);
 
 out:
@@ -764,7 +790,7 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
        }
 
        rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
-                         &ssk->isk.sk, SDP_RX_SIZE, 0);
+                         &ssk->isk.sk, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
 
        if (IS_ERR(rx_cq)) {
                rc = PTR_ERR(rx_cq);
index a7d16feb556ca464b22b054adfe630039f4ea103..95e3c82320cca8635a54dde13d7796d1e5c630dd 100644 (file)
@@ -287,15 +287,19 @@ static void sdp_poll_tx_timeout(unsigned long data)
        sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
                (u32) ring_posted(ssk->tx_ring));
 
+       sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__,
+               (u32) ring_posted(ssk->tx_ring));
+
        /* Only process if the socket is not in use */
        bh_lock_sock(sk);
        if (sock_owned_by_user(sk)) {
                mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
                sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+               SDPSTATS_COUNTER_INC(tx_poll_busy);
                goto out;
        }
 
-       if (sk->sk_state == TCP_CLOSE)
+       if (unlikely(sk->sk_state == TCP_CLOSE))
                goto out;
 
        wc_processed = sdp_process_tx_cq(ssk);