]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: process RX CQ from interrupt
authorAmir Vadai <amirv@mellanox.co.il>
Wed, 13 May 2009 11:37:13 +0000 (14:37 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:28 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_proc.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c

index 5e782820099c7da9032d9f2488cd6a1d78f49f81..57265aa9ff74a50df4642ecb818386dd2e900753 100644 (file)
@@ -7,11 +7,12 @@
 #include <net/tcp.h> /* For urgent data flags */
 #include <rdma/ib_verbs.h>
 
+#undef SDP_LOCKS_CHECK
 #define SDPSTATS_ON
-#undef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+#define SDP_PROFILING
 
 #define _sdp_printk(func, line, level, sk, format, arg...)                \
-       printk(level "%s:%d sdp_sock(%d %d:%d): " format,             \
+       printk(level "%s:%d sdp_sock(%5d %d:%d): " format,             \
               func, line, \
               current->pid, \
               (sk) ? inet_sk(sk)->num : -1,                 \
        sdp_printk(KERN_WARNING, sk, format , ## arg)
 
 
+#ifdef SDP_LOCKS_CHECK
+#define WARN_ON_UNLOCKED(sk, l) do {\
+       if (unlikely(!spin_is_locked(l))) { \
+               sdp_warn(sk, "lock " #l " should be locked\n"); \
+               WARN_ON(1); \
+       } \
+} while (0)
+
+#define WARN_ON_LOCKED(sk, l) do {\
+       if (unlikely(spin_is_locked(l))) { \
+               sdp_warn(sk, "lock " #l " should be unlocked\n"); \
+               WARN_ON(1); \
+       } \
+} while (0)
+#else
+#define WARN_ON_UNLOCKED(sk, l)
+#define WARN_ON_LOCKED(sk, l)
+#endif
+
+#define rx_ring_lock(ssk, f) do { \
+       spin_lock_irqsave(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define rx_ring_unlock(ssk, f) do { \
+       spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#ifdef SDP_PROFILING
 struct sk_buff;
 struct sdpprf_log {
        int             idx;
@@ -37,7 +66,7 @@ struct sdpprf_log {
        int             line;
 };
 
-#define SDPPRF_LOG_SIZE 0x10000 /* must be a power of 2 */
+#define SDPPRF_LOG_SIZE 0x20000 /* must be a power of 2 */
 
 extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
 extern int sdpprf_log_count;
@@ -48,7 +77,6 @@ static inline unsigned long long current_nsec(void)
        getnstimeofday(&tv);
        return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
 }
-#if 1
 #define sdp_prf(sk, s, format, arg...) ({ \
        struct sdpprf_log *l = &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
        l->idx = sdpprf_log_count - 1; \
@@ -66,16 +94,6 @@ static inline unsigned long long current_nsec(void)
 #define sdp_prf(sk, s, format, arg...)
 #endif
 
-#if 0
-#if 1
-#define sdp_prf_rx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg)
-#define sdp_prf_tx(sk, s, format, arg...)
-#else
-#define sdp_prf_rx(sk, s, format, arg...)
-#define sdp_prf_tx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg)
-#endif
-#endif
-
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG
 extern int sdp_debug_level;
 
@@ -125,7 +143,6 @@ extern int sdp_data_debug_level;
        } while (0)
 #else
 #define sdp_dbg_data(priv, format, arg...)
-//     do { (void) (priv); } while (0)
 #define SDP_DUMP_PACKET(sk, str, skb, h)
 #endif
 
@@ -300,23 +317,36 @@ struct sdp_buf {
         u64             mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
 };
 
+#define ring_head(ring)   (atomic_read(&(ring).head))
+#define ring_tail(ring)   (atomic_read(&(ring).tail))
+#define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
 
 struct sdp_tx_ring {
        struct sdp_buf   *buffer;
-       unsigned          head;
-       unsigned          tail;
+       atomic_t          head;
+       atomic_t          tail;
+       struct ib_cq     *cq;
 
        int               una_seq;
-       unsigned          credits;
+       atomic_t          credits;
+#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
 
        struct timer_list timer;
        u16               poll_cnt;
+};
+
+struct sdp_rx_ring {
+       struct sdp_buf   *buffer;
+       atomic_t          head;
+       atomic_t          tail;
        struct ib_cq     *cq;
+
+       spinlock_t       lock;
 };
 
 static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
 {
-       return SDP_TX_SIZE - (tx_ring->head - tx_ring->tail);
+       return SDP_TX_SIZE - ring_posted(*tx_ring);
 }
 
 struct sdp_chrecvbuf {
@@ -329,6 +359,7 @@ struct sdp_sock {
        struct list_head sock_list;
        struct list_head accept_queue;
        struct list_head backlog_queue;
+       struct sk_buff_head rx_backlog;
        struct sock *parent;
 
        struct work_struct rx_comp_work;
@@ -362,32 +393,27 @@ struct sdp_sock {
        int sdp_disconnect;
        int destruct_in_process;
 
-       
+       struct sdp_rx_ring rx_ring;
+       struct sdp_tx_ring tx_ring;
 
        /* Data below will be reset on error */
        struct rdma_cm_id *id;
        struct ib_device *ib_device;
 
        /* SDP specific */
-       struct ib_recv_wr rx_wr;
-       unsigned rx_head;
-       unsigned rx_tail;
-       unsigned mseq_ack;
+       atomic_t mseq_ack;
+#define mseq_ack(ssk) (atomic_read(&ssk->mseq_ack))
        unsigned max_bufs;      /* Initial buffers offered by other side */
        unsigned min_bufs;      /* Low water mark to wake senders */
 
-       int               remote_credits;
+       atomic_t               remote_credits;
+#define remote_credits(ssk) (atomic_read(&ssk->remote_credits))
        int               poll_cq;
 
        /* rdma specific */
        struct ib_qp *qp;
-       struct ib_cq *rx_cq;
        struct ib_mr *mr;
 
-       struct sdp_buf *rx_ring;
-       struct sdp_tx_ring tx_ring;
-       struct ib_send_wr tx_wr;
-
        /* SDP slow start */
        int rcvbuf_scale;       /* local recv buf scale for each socket */
        int sent_request_head;  /* mark the tx_head of the last send resize
@@ -402,8 +428,6 @@ struct sdp_sock {
 
        /* BZCOPY data */
        int   zcopy_thresh;
-
-       struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
 };
 
 /* Context used for synchronous zero copy bcopy (BZCOY) */
@@ -499,13 +523,6 @@ static inline void sdp_set_error(struct sock *sk, int err)
        sk->sk_error_report(sk);
 }
 
-static inline void sdp_arm_rx_cq(struct sock *sk)
-{
-       sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n");
-       
-       ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP);
-}
-
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
 void _dump_packet(const char *func, int line, struct sock *sk, char *str,
                struct sk_buff *skb, const struct sdp_bsdh *h);
@@ -524,28 +541,53 @@ void sdp_remove_sock(struct sdp_sock *ssk);
 void sdp_remove_large_sock(struct sdp_sock *ssk);
 void sdp_post_keepalive(struct sdp_sock *ssk);
 void sdp_start_keepalive_timer(struct sock *sk);
-void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 int sdp_init_sock(struct sock *sk);
 int __init sdp_proc_init(void);
 void sdp_proc_unregister(void);
 
+/* sdp_tx.c */
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_tx_ring_destroy(struct sdp_sock *ssk);
 int sdp_xmit_poll(struct sdp_sock *ssk, int force);
-void sdp_tx_ring_purge(struct sdp_sock *ssk);
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
 void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle);
 #define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle)
-void sdp_process_tx_wc_work(struct work_struct *work);
-void sdp_poll_tx_cq(unsigned long data);
-void _sdp_poll_tx_cq(unsigned long data);
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
 
+/* sdp_rx.c */
+void sdp_rx_ring_init(struct sdp_sock *ssk);
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_rx_ring_destroy(struct sdp_sock *ssk);
+int sdp_process_rx_q(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_rx_ring_purge(struct sdp_sock *ssk);
 void sdp_post_recvs(struct sdp_sock *ssk);
-void sdp_rx_comp_work(struct work_struct *work);
-int sdp_poll_rx_cq(struct sdp_sock *ssk);
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
+
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+       sdp_prf(sk, NULL, "Arming RX cq");
+       sdp_dbg_data(sk, "Arming RX cq\n");
+       
+       ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+/* utilities */
+static inline char *mid2str(int mid)
+{
+#define ENUM2STR(e) [e] = #e
+       static char *mid2str[] = {
+               ENUM2STR(SDP_MID_HELLO),
+               ENUM2STR(SDP_MID_HELLO_ACK),
+               ENUM2STR(SDP_MID_DISCONN),
+               ENUM2STR(SDP_MID_CHRCVBUF),
+               ENUM2STR(SDP_MID_CHRCVBUF_ACK),
+               ENUM2STR(SDP_MID_DATA),
+       };
+
+       if (mid >= ARRAY_SIZE(mid2str))
+               return NULL;
+
+       return mid2str[mid];
+}
 
 static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
 {
index 5d4441ae8ecc764d658d934830b08303ea4628a0..b98171ebc38cf3a417bb2f2107456c9d9a9e89ac 100644 (file)
@@ -45,18 +45,9 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
 {
        int len = 0;
        char buf[256];
-#define ENUM2STR(e) [e] = #e
-       static char *mid2str[] = {
-               ENUM2STR(SDP_MID_HELLO),
-               ENUM2STR(SDP_MID_HELLO_ACK),
-               ENUM2STR(SDP_MID_DISCONN),
-               ENUM2STR(SDP_MID_CHRCVBUF),
-               ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-               ENUM2STR(SDP_MID_DATA),
-       };
-       len += snprintf(buf, 255-len, "skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d "
+       len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d "
                "len: %d mseq: %d mseq_ack: %d",
-               skb, h->mid, mid2str[h->mid], h->flags,
+               str, skb, h->mid, mid2str(h->mid), h->flags,
                ntohs(h->bufs), ntohl(h->len),ntohl(h->mseq),
                ntohl(h->mseq_ack));
 
@@ -117,7 +108,7 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
        return (ssk->nonagle & TCP_NAGLE_OFF) ||
                skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
                skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
-               (ssk->tx_ring.tail == ssk->tx_ring.head &&
+               (ring_tail(ssk->tx_ring) == ring_head(ssk->tx_ring) &&
                 !(ssk->nonagle & TCP_NAGLE_CORK)) ||
                (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
 }
@@ -125,26 +116,14 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
 int sdp_post_credits(struct sdp_sock *ssk)
 {
        int post_count = 0;
-       struct sk_buff *skb;
 
        sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
                        "tx ring slots left: %d send_head: %p\n",
-               ssk->tx_ring.credits, ssk->remote_credits,
+               tx_credits(ssk), remote_credits(ssk),
                sdp_tx_ring_slots_left(&ssk->tx_ring),
                ssk->isk.sk.sk_send_head);
 
-       if (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS &&
-              sdp_tx_ring_slots_left(&ssk->tx_ring) &&
-              (skb = ssk->isk.sk.sk_send_head) &&
-               sdp_nagle_off(ssk, skb)) {
-               update_send_head(&ssk->isk.sk, skb);
-               __skb_dequeue(&ssk->isk.sk.sk_write_queue);
-               sdp_post_send(ssk, skb, SDP_MID_DATA);
-               post_count++;
-               goto out;
-       }
-
-       if (likely(ssk->tx_ring.credits > 1) &&
+       if (likely(tx_credits(ssk) > 1) &&
            likely(sdp_tx_ring_slots_left(&ssk->tx_ring))) {
                struct sk_buff *skb;
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -156,7 +135,6 @@ int sdp_post_credits(struct sdp_sock *ssk)
                post_count++;
        }
 
-out:
        if (post_count)
                sdp_xmit_poll(ssk, 0);
        return post_count;
@@ -188,7 +166,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
                gfp_page = GFP_KERNEL;
 
        sdp_dbg_data(&ssk->isk.sk, "credits: %d tx ring slots left: %d send_head: %p\n",
-               ssk->tx_ring.credits, sdp_tx_ring_slots_left(&ssk->tx_ring),
+               tx_credits(ssk), sdp_tx_ring_slots_left(&ssk->tx_ring),
                ssk->isk.sk.sk_send_head);
 
        if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
@@ -197,8 +175,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
        }
 
        if (ssk->recv_request &&
-           ssk->rx_tail >= ssk->recv_request_head &&
-           ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS &&
+           ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
+           tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
            sdp_tx_ring_slots_left(&ssk->tx_ring)) {
                struct sdp_chrecvbuf *resp_size;
                ssk->recv_request = 0;
@@ -214,7 +192,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
                post_count++;
        }
 
-       if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS &&
+       if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
               sdp_tx_ring_slots_left(&ssk->tx_ring) &&
               (skb = ssk->isk.sk.sk_send_head) &&
                sdp_nagle_off(ssk, skb)) {
@@ -222,7 +200,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
                sdp_prf(&ssk->isk.sk, skb, "no credits. called from %s:%d", func, line);
        }
 
-       while (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS &&
+       while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
               sdp_tx_ring_slots_left(&ssk->tx_ring) &&
               (skb = ssk->isk.sk.sk_send_head) &&
                sdp_nagle_off(ssk, skb)) {
@@ -232,9 +210,9 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
                post_count++;
        }
 
-       if (0 && ssk->tx_ring.credits == SDP_MIN_TX_CREDITS &&
+       if (0 && tx_credits(ssk) == SDP_MIN_TX_CREDITS &&
            !ssk->sent_request &&
-           ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
+           ring_head(ssk->tx_ring) > ssk->sent_request_head + SDP_RESIZE_WAIT &&
            sdp_tx_ring_slots_left(&ssk->tx_ring)) {
                struct sdp_chrecvbuf *req_size;
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -244,19 +222,19 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
                /* FIXME */
                BUG_ON(!skb);
                ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
-               ssk->sent_request_head = ssk->tx_ring.head;
+               ssk->sent_request_head = ring_head(ssk->tx_ring);
                req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
                req_size->size = htonl(ssk->sent_request);
                sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
                post_count++;
        }
 
-       c = ssk->remote_credits;
+       c = remote_credits(ssk);
        if (likely(c > SDP_MIN_TX_CREDITS))
                c *= 2;
 
-       if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
-           likely(ssk->tx_ring.credits > 1) &&
+       if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+           likely(tx_credits(ssk) > 1) &&
            likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
            likely((1 << ssk->isk.sk.sk_state) &
                    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
@@ -276,8 +254,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
         * messages that provide additional credits and also do not contain ULP
         * payload. */
        if (unlikely(ssk->sdp_disconnect) &&
-               !ssk->isk.sk.sk_send_head &&
-               ssk->tx_ring.credits > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) {
+                       !ssk->isk.sk.sk_send_head &&
+                       tx_credits(ssk) > 1) {
                ssk->sdp_disconnect = 0;
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
                                          sizeof(struct sdp_bsdh),
@@ -287,14 +265,10 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
                sdp_post_send(ssk, skb, SDP_MID_DISCONN);
                post_count++;
        }
+
        if (post_count) {
                sdp_xmit_poll(ssk, 0);
 
-               sdp_prf(&ssk->isk.sk, NULL, "finshed polling from post_sends");
+               sdp_prf(&ssk->isk.sk, NULL, "post_sends finished polling [%s:%d].", func, line);
        }
 }
-
-static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
-{
-       return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
-}
index e0d0b20e8c31fcffd703fa1092b1eaa9ba2c059b..a9dcf777509c440ea380e48a90b6270e97d646cd 100644 (file)
@@ -53,10 +53,6 @@ enum {
        SDP_HAH_SIZE = 180,
 };
 
-static void sdp_cq_event_handler(struct ib_event *event, void *data)
-{
-}
-
 static void sdp_qp_event_handler(struct ib_event *event, void *data)
 {
 }
@@ -73,36 +69,12 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
                .qp_type = IB_QPT_RC,
        };
        struct ib_device *device = id->device;
-       struct ib_cq *rx_cq, *tx_cq;
        struct ib_mr *mr;
        struct ib_pd *pd;
        int rc;
 
        sdp_dbg(sk, "%s\n", __func__);
 
-       sdp_sk(sk)->tx_ring.head = 1;
-       sdp_sk(sk)->tx_ring.tail = 1;
-       sdp_sk(sk)->rx_head = 1;
-       sdp_sk(sk)->rx_tail = 1;
-
-       sdp_sk(sk)->tx_ring.buffer = kmalloc(sizeof(*sdp_sk(sk)->tx_ring.buffer) *
-                       (SDP_TX_SIZE + 1), GFP_KERNEL);
-       if (!sdp_sk(sk)->tx_ring.buffer) {
-               rc = -ENOMEM;
-               sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n",
-                        sizeof *sdp_sk(sk)->tx_ring.buffer * (SDP_TX_SIZE + 1));
-               goto err_tx;
-       }
-
-       sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE,
-                                     GFP_KERNEL);
-       if (!sdp_sk(sk)->rx_ring) {
-               rc = -ENOMEM;
-               sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n",
-                        sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE);
-               goto err_rx;
-       }
-
        pd = ib_alloc_pd(device);
        if (IS_ERR(pd)) {
                rc = PTR_ERR(pd);
@@ -118,43 +90,15 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         }
 
        sdp_sk(sk)->mr = mr;
-       INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work);
-
-       rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler,
-                         sk, SDP_RX_SIZE, 0);
-
-       if (IS_ERR(rx_cq)) {
-               rc = PTR_ERR(rx_cq);
-               sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc);
-               goto err_rx_cq;
-       }
 
-       rc = ib_modify_cq(rx_cq, 10, 200);
-       if (rc) {
-               sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc);
-               goto err_tx_cq;
-       }
-       sdp_warn(sk, "Initialized CQ moderation\n");
-       sdp_sk(sk)->rx_cq = rx_cq;
-       sdp_arm_rx_cq(sk);
-       qp_init_attr.recv_cq = rx_cq;
-
-       tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler,
-                         sk, SDP_TX_SIZE, 0);
-
-       if (IS_ERR(tx_cq)) {
-               rc = PTR_ERR(tx_cq);
-               sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc);
-               goto err_tx_cq;
-       }
+       if ((rc = sdp_rx_ring_create(sdp_sk(sk), device)))
+               goto err_rx;
 
-       init_timer(&sdp_sk(sk)->tx_ring.timer);
-       sdp_sk(sk)->tx_ring.timer.function = _sdp_poll_tx_cq;
-       sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk);
-       sdp_sk(sk)->tx_ring.poll_cnt = 0;
+       if ((rc = sdp_tx_ring_create(sdp_sk(sk), device)))
+               goto err_tx;
 
-       sdp_sk(sk)->tx_ring.cq = tx_cq;
-        qp_init_attr.send_cq = tx_cq;
+       qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
+       qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
 
        rc = rdma_create_qp(id, pd, &qp_init_attr);
        if (rc) {
@@ -170,20 +114,14 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
        return 0;
 
 err_qp:
-       ib_destroy_cq(tx_cq);
-err_tx_cq:
-       ib_destroy_cq(rx_cq);
-err_rx_cq:
+       sdp_tx_ring_destroy(sdp_sk(sk));
+err_tx:
+       sdp_rx_ring_destroy(sdp_sk(sk));
+err_rx:
        ib_dereg_mr(sdp_sk(sk)->mr);
 err_mr:
        ib_dealloc_pd(pd);
 err_pd:
-       kfree(sdp_sk(sk)->rx_ring);
-       sdp_sk(sk)->rx_ring = NULL;
-err_rx:
-       kfree(sdp_sk(sk)->tx_ring.buffer);
-       sdp_sk(sk)->tx_ring.buffer = NULL;
-err_tx:
        return rc;
 }
 
@@ -225,8 +163,10 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
 
        sdp_add_sock(sdp_sk(child));
 
-       sdp_sk(child)->max_bufs = sdp_sk(child)->tx_ring.credits = ntohs(h->bsdh.bufs);
-       sdp_sk(child)->min_bufs = sdp_sk(child)->tx_ring.credits / 4;
+       sdp_sk(child)->max_bufs = ntohs(h->bsdh.bufs);
+       atomic_set(&sdp_sk(child)->tx_ring.credits, sdp_sk(child)->max_bufs);
+
+       sdp_sk(child)->min_bufs = tx_credits(sdp_sk(child)) / 4;
        sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
                sizeof(struct sdp_bsdh);
        sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
@@ -236,7 +176,7 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
        sdp_dbg(child, "%s recv_frags: %d tx credits %d xmit_size_goal %d send trigger %d\n",
                __func__,
                sdp_sk(child)->recv_frags,
-               sdp_sk(child)->tx_ring.credits,
+               tx_credits(sdp_sk(child)),
                sdp_sk(child)->xmit_size_goal,
                sdp_sk(child)->min_bufs);
 
@@ -272,8 +212,9 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
 
        h = event->param.conn.private_data;
        SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
-       sdp_sk(sk)->max_bufs = sdp_sk(sk)->tx_ring.credits = ntohs(h->bsdh.bufs);
-       sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4;
+       sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs);
+       atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs);
+       sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4;
        sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
                sizeof(struct sdp_bsdh);
        sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
@@ -282,14 +223,12 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
                sdp_sk(sk)->send_frags * PAGE_SIZE);
 
        sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n",
-               sdp_sk(sk)->tx_ring.credits,
+               tx_credits(sdp_sk(sk)),
                sdp_sk(sk)->xmit_size_goal,
                sdp_sk(sk)->send_frags,
                sdp_sk(sk)->min_bufs);
 
        sdp_sk(sk)->poll_cq = 1;
-       sdp_arm_rx_cq(sk);
-       sdp_poll_rx_cq(sdp_sk(sk));
 
        sk->sk_state_change(sk);
        sk_wake_async(sk, 0, POLL_OUT);
@@ -349,8 +288,7 @@ static int sdp_disconnected_handler(struct sock *sk)
 
        sdp_dbg(sk, "%s\n", __func__);
 
-       if (ssk->rx_cq)
-               sdp_poll_rx_cq(ssk);
+       sdp_process_rx_q(ssk);
 
        if (ssk->tx_ring.cq)
                sdp_xmit_poll(ssk, 1);
@@ -400,7 +338,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT);
                break;
        case RDMA_CM_EVENT_ADDR_ERROR:
-               sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
+               sdp_warn(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
                rc = -ENETUNREACH;
                break;
        case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -408,11 +346,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                rc = sdp_init_qp(sk, id);
                if (rc)
                        break;
-               sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head -
-                       sdp_sk(sk)->rx_tail;
+               atomic_set(&sdp_sk(sk)->remote_credits, ring_posted(sdp_sk(sk)->rx_ring));
                memset(&hh, 0, sizeof hh);
                hh.bsdh.mid = SDP_MID_HELLO;
-               hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits);
+               hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
                hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
                hh.max_adverts = 1;
                hh.majv_minv = SDP_MAJV_MINV;
@@ -443,11 +380,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
                        break;
                }
                child = id->context;
-               sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head -
-                       sdp_sk(child)->rx_tail;
+               atomic_set(&sdp_sk(child)->remote_credits, ring_posted(sdp_sk(child)->rx_ring));
                memset(&hah, 0, sizeof hah);
                hah.bsdh.mid = SDP_MID_HELLO_ACK;
-               hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits);
+               hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
                hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
                hah.majv_minv = SDP_MAJV_MINV;
                hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
index b5322da653220666744bc7510897817436749976..c6b17dbcc6d895b1e44daaa3253d9676ef18bae4 100644 (file)
@@ -206,31 +206,24 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 static void sdp_destroy_qp(struct sdp_sock *ssk)
 {
        struct ib_pd *pd = NULL;
-       struct ib_cq *rx_cq = NULL;
-       struct ib_cq *tx_cq = NULL;
+       unsigned long flags;
+
+
+       sdp_dbg(&ssk->isk.sk, "destroying qp\n");
 
        del_timer(&ssk->tx_ring.timer);
 
+       rx_ring_lock(ssk, flags);
+
+       sdp_rx_ring_destroy(ssk);
+       sdp_tx_ring_destroy(ssk);
+
        if (ssk->qp) {
                pd = ssk->qp->pd;
-               rx_cq = ssk->rx_cq;
-               ssk->rx_cq = NULL;
-               tx_cq = ssk->tx_ring.cq;
-               ssk->tx_ring.cq = NULL;
                ib_destroy_qp(ssk->qp);
                ssk->qp = NULL;
-
-               sdp_rx_ring_purge(ssk);
-               sdp_tx_ring_purge(ssk);
-       }
-
-       if (tx_cq) {
-               ib_destroy_cq(tx_cq);
        }
 
-       if (rx_cq)
-               ib_destroy_cq(rx_cq);
-
        if (ssk->mr) {
                ib_dereg_mr(ssk->mr);
                ssk->mr = NULL;
@@ -241,14 +234,8 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
 
        sdp_remove_large_sock(ssk);
 
-       if (ssk->rx_ring) {
-               kfree(ssk->rx_ring);
-               ssk->rx_ring = NULL;
-       }
-       if (ssk->tx_ring.buffer) {
-               kfree(ssk->tx_ring.buffer);
-               ssk->tx_ring.buffer = NULL;
-       }
+       rx_ring_unlock(ssk, flags);
+
 }
 
 static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
@@ -257,8 +244,8 @@ static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
 
        sdp_dbg(sk, "%s\n", __func__);
 
-       ssk->keepalive_tx_head = ssk->tx_ring.head;
-       ssk->keepalive_rx_head = ssk->rx_head;
+       ssk->keepalive_tx_head = ring_head(ssk->tx_ring);
+       ssk->keepalive_rx_head = ring_head(ssk->rx_ring);
 
        sk_reset_timer(sk, &sk->sk_timer, jiffies + len);
 }
@@ -293,8 +280,8 @@ static void sdp_keepalive_timer(unsigned long data)
            sk->sk_state == TCP_CLOSE)
                goto out;
 
-       if (ssk->keepalive_tx_head == ssk->tx_ring.head &&
-           ssk->keepalive_rx_head == ssk->rx_head)
+       if (ssk->keepalive_tx_head == ring_head(ssk->tx_ring) &&
+           ssk->keepalive_rx_head == ring_head(ssk->rx_ring))
                sdp_post_keepalive(ssk);
 
        sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(ssk));
@@ -338,17 +325,21 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
        read_lock(&device_removal_lock);
 
-       if (ssk->rx_cq)
-               sdp_poll_rx_cq(ssk);
+       sdp_process_rx_q(ssk);
 
        if (ssk->tx_ring.cq)
                sdp_xmit_poll(ssk, 1);
 
-       if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
+       if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
+               sdp_warn(sk, "setting state to error. shutdown: %d, mem_free: %d\n",
+                               !(sk->sk_shutdown & RCV_SHUTDOWN),
+                               !sk_stream_memory_free(sk));
                sdp_set_error(sk, rc);
+       }
 
        sdp_destroy_qp(ssk);
 
+       sdp_dbg(sk, "memset on sdp_sock\n");
        memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id));
 
        sk->sk_state_change(sk);
@@ -488,6 +479,7 @@ static void sdp_close(struct sock *sk, long timeout)
        lock_sock(sk);
 
        sdp_dbg(sk, "%s\n", __func__);
+       sdp_prf(sk, NULL, __func__);
 
        sdp_delete_keepalive_timer(sk);
 
@@ -773,10 +765,9 @@ out:
        release_sock(sk);
        if (newsk) {
                lock_sock(newsk);
-               if (newssk->rx_cq) {
+               if (newssk->rx_ring.cq) {
                        newssk->poll_cq = 1;
                        sdp_arm_rx_cq(&newssk->isk.sk);
-                       sdp_poll_rx_cq(newssk);
                }
                release_sock(newsk);
        }
@@ -934,7 +925,11 @@ int sdp_init_sock(struct sock *sk)
 
        sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
 
-       ssk->rx_ring = NULL;
+       skb_queue_head_init(&ssk->rx_backlog);
+
+       atomic_set(&ssk->mseq_ack, 0);
+
+       sdp_rx_ring_init(ssk);
        ssk->tx_ring.buffer = NULL;
        ssk->sdp_disconnect = 0;
        ssk->destructed_already = 0;
@@ -1142,14 +1137,13 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
 static inline int poll_recv_cq(struct sock *sk)
 {
        int i;
-       if (sdp_sk(sk)->rx_cq) {
-               for (i = 0; i < recv_poll; ++i)
-                       if (!sdp_poll_rx_cq(sdp_sk(sk))) {
-                               ++recv_poll_hit;
-                               return 0;
-                       }
-               ++recv_poll_miss;
+       for (i = 0; i < recv_poll; ++i) {
+               if (!sdp_process_rx_q(sdp_sk(sk))) {
+                       ++recv_poll_hit;
+                       return 0;
+               }
        }
+       ++recv_poll_miss;
        return 1;
 }
 
@@ -1551,8 +1545,8 @@ static inline int slots_free(struct sdp_sock *ssk)
 {
        int min_free;
 
-       min_free = MIN(ssk->tx_ring.credits,
-                       SDP_TX_SIZE - (ssk->tx_ring.head - ssk->tx_ring.tail));
+       min_free = MIN(tx_credits(ssk),
+                       SDP_TX_SIZE - ring_posted(ssk->tx_ring));
        if (min_free < SDP_MIN_TX_CREDITS)
                return 0;
 
@@ -1608,6 +1602,9 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                sk->sk_write_pending++;
+               sdp_prf(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d",
+                               tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring),
+                               bz->busy);
                sk_wait_event(sk, &current_timeo,
                        sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
                sk->sk_write_pending--;
@@ -1627,24 +1624,6 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
        return err;
 }
 
-/* like sk_stream_write_space - execpt measures remote credits */
-void sdp_bzcopy_write_space(struct sdp_sock *ssk)
-{
-       struct sock *sk = &ssk->isk.sk;
-       struct socket *sock = sk->sk_socket;
-
-       if (ssk->tx_ring.credits >= ssk->min_bufs &&
-           ssk->tx_ring.head == ssk->tx_ring.tail &&
-          sock != NULL) {
-               clear_bit(SOCK_NOSPACE, &sock->flags);
-
-               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-                       wake_up_interruptible(sk->sk_sleep);
-               if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
-                       sock_wake_async(sock, 2, POLL_OUT);
-       }
-}
-
 /* Like tcp_sendmsg */
 /* TODO: check locking */
 static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1735,7 +1714,7 @@ new_segment:
                                if (!skb)
                                        goto wait_for_memory;
 
-                               sdp_prf(sk, skb, "Created");
+//                             sdp_prf(sk, skb, "Created");
 
                                BZCOPY_STATE(skb) = bz;
 
@@ -1751,7 +1730,7 @@ new_segment:
                                skb_entail(sk, ssk, skb);
                                copy = size_goal;
                        } else {
-                               sdp_prf(sk, skb, "adding %d bytes", copy);
+//                             sdp_prf(sk, skb, "adding %d bytes", copy);
                                sdp_dbg_data(sk, "adding to existing skb: %p"
                                        " len = %d, sk_send_head: %p copy: %d\n",
                                        skb, skb->len, sk->sk_send_head, copy);
@@ -1770,10 +1749,8 @@ new_segment:
                                goto new_segment;
                        }
 
-//                     sdp_prf(sk, skb, "before memcpy %d bytes", copy);
                        copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
                                      sdp_bcopy_get(sk, skb, from, copy);
-//                     sdp_prf(sk, skb, "after memcpy. result: %d", copy);
                        if (unlikely(copy < 0)) {
                                if (!++copy)
                                        goto wait_for_memory;
@@ -1863,6 +1840,20 @@ out_err:
        return err;
 }
 
+int dummy_memcpy_toiovec(struct iovec *iov, int len)
+{
+       while (len > 0) {
+               if (iov->iov_len) {
+                       int copy = min_t(unsigned int, iov->iov_len, len);
+                       len -= copy;
+                       iov->iov_len -= copy;
+                       iov->iov_base += copy;
+               }
+               iov++;
+       }
+
+       return 0;
+}
 /* Like tcp_recvmsg */
 /* Maybe use skb_recv_datagram here? */
 /* Note this does not seem to handle vectored messages. Relevant? */
@@ -1884,7 +1875,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        lock_sock(sk);
        sdp_dbg_data(sk, "%s\n", __func__);
 
-       sdp_prf(sk, skb, "Read from user");
+//     sdp_prf(sk, skb, "Read from user");
 
        err = -ENOTCONN;
        if (sk->sk_state == TCP_LISTEN)
@@ -2024,6 +2015,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                        err = skb_copy_datagram_iovec(skb, offset,
                                                      /* TODO: skip header? */
                                                      msg->msg_iov, used);
+//                     err = dummy_memcpy_toiovec(msg->msg_iov, used);
                        sdp_prf(sk, skb, "Copied to user %ld bytes. err = %d", used, err);
                        if (err) {
                                sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
index 3778c9a971af9c2e5a5093778a3cb43f0e7f9c29..537b3a626553cdbed70bba0d8152cfdfed9b204c 100644 (file)
@@ -236,15 +236,6 @@ static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n, in
 
 static int sdpstats_seq_show(struct seq_file *seq, void *v)
 {
-#define ENUM2STR(e) [e] = #e
-       static char *mid2str[] = {
-               ENUM2STR(SDP_MID_HELLO),
-               ENUM2STR(SDP_MID_HELLO_ACK),
-               ENUM2STR(SDP_MID_DISCONN),
-               ENUM2STR(SDP_MID_CHRCVBUF),
-               ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-               ENUM2STR(SDP_MID_DATA),
-       };
        int i;
 
        seq_printf(seq, "SDP statistics:\n");
@@ -268,9 +259,9 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
        seq_printf(seq, "memcpy_count       \t\t: %u\n", sdpstats.memcpy_count);
 
        for (i = 0; i < ARRAY_SIZE(sdpstats.post_send); i++) {
-               if (mid2str[i]) {
+               if (mid2str(i)) {
                        seq_printf(seq, "post_send %-20s\t: %d\n",
-                                       mid2str[i], sdpstats.post_send[i]);
+                                       mid2str(i), sdpstats.post_send[i]);
                }
        }
 
index ba8e0fefb2fce41a7d60795e2510457609768aec..810dcbbcdeadc8478d5a075a4a26a84a836f1605 100644 (file)
@@ -39,7 +39,7 @@
 
 static int rcvbuf_scale = 0x10;
 
-int rcvbuf_initial_size = SDP_HEAD_SIZE;
+int rcvbuf_initial_size = 32 * 1024;
 module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
 MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
 
@@ -149,22 +149,28 @@ static void sdp_fin(struct sock *sk)
        }
 }
 
-
-static void sdp_post_recv(struct sdp_sock *ssk)
+/* lock_sock must be taken before calling this - since rx_ring.head is not 
+ * protected (although being atomic
+ */
+static int sdp_post_recv(struct sdp_sock *ssk)
 {
        struct sdp_buf *rx_req;
        int i, rc, frags;
        u64 addr;
        struct ib_device *dev;
-       struct ib_sge *sge;
+       struct ib_recv_wr rx_wr = { 0 };
+       struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+       struct ib_sge *sge = ibsge;
        struct ib_recv_wr *bad_wr;
        struct sk_buff *skb;
        struct page *page;
        skb_frag_t *frag;
        struct sdp_bsdh *h;
-       int id = ssk->rx_head;
+       int id = ring_head(ssk->rx_ring);
        gfp_t gfp_page;
+       int ret = 0;
 
+       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
        /* Now, allocate and repost recv */
        /* TODO: allocate from cache */
 
@@ -194,10 +200,9 @@ static void sdp_post_recv(struct sdp_sock *ssk)
                skb->truesize += frag->size;
        }
 
-        rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
+        rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
        rx_req->skb = skb;
        dev = ssk->ib_device;
-       sge = ssk->ibsge;
        addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
        BUG_ON(ib_dma_mapping_error(dev, addr));
 
@@ -221,27 +226,32 @@ static void sdp_post_recv(struct sdp_sock *ssk)
                sge->lkey = ssk->mr->lkey;
        }
 
-       ssk->rx_wr.next = NULL;
-       ssk->rx_wr.wr_id = id | SDP_OP_RECV;
-       ssk->rx_wr.sg_list = ssk->ibsge;
-       ssk->rx_wr.num_sge = frags + 1;
-       rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
-       sdp_prf(&ssk->isk.sk, skb, "rx skb was posted");
+       rx_wr.next = NULL;
+       rx_wr.wr_id = id | SDP_OP_RECV;
+       rx_wr.sg_list = ibsge;
+       rx_wr.num_sge = frags + 1;
+       rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
        SDPSTATS_COUNTER_INC(post_recv);
-       ++ssk->rx_head;
+       atomic_inc(&ssk->rx_ring.head);
        if (unlikely(rc)) {
-               sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
+               sdp_warn(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
                sdp_reset(&ssk->isk.sk);
+               ret = -1;
        }
 
        atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+       return ret;
 }
 
-void sdp_post_recvs(struct sdp_sock *ssk)
+/* lock_sock must be taken before calling this */
+static void _sdp_post_recvs(struct sdp_sock *ssk)
 {
        struct sock *sk = &ssk->isk.sk;
        int scale = ssk->rcvbuf_scale;
 
+       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
        if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
                (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
                return;
@@ -251,12 +261,23 @@ void sdp_post_recvs(struct sdp_sock *ssk)
            (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
                scale = 1;
 
-       while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
-               (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) *
+       while ((likely(ring_posted(ssk->rx_ring) < SDP_RX_SIZE) &&
+               (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
                (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
                ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
-              unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS))
-               sdp_post_recv(ssk);
+              unlikely(ring_posted(ssk->rx_ring) < SDP_MIN_TX_CREDITS)) {
+               if (sdp_post_recv(ssk))
+                       break;
+       }
+}
+
+void sdp_post_recvs(struct sdp_sock *ssk)
+{
+       unsigned long flags;
+
+       rx_ring_lock(ssk, flags);
+       _sdp_post_recvs(ssk);
+       rx_ring_unlock(ssk, flags);
 }
 
 static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
@@ -328,9 +349,9 @@ int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
 static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
 {
        if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
-               ssk->recv_request_head = ssk->rx_head + 1;
+               ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
        else
-               ssk->recv_request_head = ssk->rx_tail;
+               ssk->recv_request_head = ring_tail(ssk->rx_ring);
        ssk->recv_request = 1;
 }
 
@@ -347,23 +368,17 @@ static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *bu
                ssk->sent_request = 0;
 }
 
-static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed)
+static inline int credit_update_needed(struct sdp_sock *ssk)
 {
        int c;
 
-       c = ssk->remote_credits;
+       c = remote_credits(ssk);
        if (likely(c > SDP_MIN_TX_CREDITS))
                c += c/2;
 
-/*     sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d "
-                       "tx ring slots left: %d send_head: %p\n",
-               ssk->tx_ring.credits, ssk->remote_credits,
-               sdp_tx_ring_slots_left(&ssk->tx_ring),
-               ssk->isk.sk.sk_send_head);
-*/
-       return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) &&
-           likely(ssk->tx_ring.credits > 1) &&
-           likely(sdp_tx_ring_slots_left(&ssk->tx_ring)));
+       return unlikely(c < ring_posted(ssk->rx_ring)) &&
+           likely(tx_credits(ssk) > 1) &&
+           likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
 }
 
 
@@ -374,14 +389,16 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
        struct sk_buff *skb;
        int i, frags;
 
-       if (unlikely(id != ssk->rx_tail)) {
+       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+       if (unlikely(id != ring_tail(ssk->rx_ring))) {
                printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
-                       id, ssk->rx_tail);
+                       id, ring_tail(ssk->rx_ring));
                return NULL;
        }
 
        dev = ssk->ib_device;
-        rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
+        rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
        skb = rx_req->skb;
        ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
                            DMA_FROM_DEVICE);
@@ -390,67 +407,20 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
                ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
                                  skb_shinfo(skb)->frags[i].size,
                                  DMA_FROM_DEVICE);
-       ++ssk->rx_tail;
-       --ssk->remote_credits;
+       atomic_inc(&ssk->rx_ring.tail);
+       atomic_dec(&ssk->remote_credits);
        return skb;
 }
 
-static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+/* this must be called while sock_lock is taken */
+static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
 {
        struct sock *sk = &ssk->isk.sk;
        int frags;
-       struct sk_buff *skb;
        struct sdp_bsdh *h;
        int pagesz, i;
 
-       skb = sdp_recv_completion(ssk, wc->wr_id);
-       if (unlikely(!skb))
-               return -1;
-
-       sdp_prf(sk, skb, "recv completion");    
-
-       atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-
-       if (unlikely(wc->status)) {
-               if (wc->status != IB_WC_WR_FLUSH_ERR) {
-                       sdp_dbg(sk, "Recv completion with error. Status %d\n",
-                               wc->status);
-                       sdp_reset(sk);
-               }
-               __kfree_skb(skb);
-               return 0;
-       }
-
-       sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
-                       (int)wc->wr_id, wc->byte_len);
-       if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
-               printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
-                               wc->byte_len, sizeof(struct sdp_bsdh));
-               __kfree_skb(skb);
-               return -1;
-       }
-       skb->len = wc->byte_len;
-       if (likely(wc->byte_len > SDP_HEAD_SIZE))
-               skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
-       else
-               skb->data_len = 0;
-       skb->data = skb->head;
-#ifdef NET_SKBUFF_DATA_USES_OFFSET
-       skb->tail = skb_headlen(skb);
-#else
-       skb->tail = skb->head + skb_headlen(skb);
-#endif
        h = (struct sdp_bsdh *)skb->data;
-       SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
-       skb_reset_transport_header(skb);
-       ssk->mseq_ack = ntohl(h->mseq);
-       if (ssk->mseq_ack != (int)wc->wr_id)
-               printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
-                               ssk->mseq_ack, (int)wc->wr_id);
-
-       SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits);
-       ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 +
-               ntohs(h->bufs);
 
        frags = skb_shinfo(skb)->nr_frags;
        pagesz = PAGE_ALIGN(skb->data_len);
@@ -513,13 +483,105 @@ static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
        return 0;
 }
 
-int sdp_poll_rx_cq(struct sdp_sock *ssk)
+/* called only from irq */
+static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
 {
-       struct ib_cq *cq = ssk->rx_cq;
+       struct sk_buff *skb;
+       struct sdp_bsdh *h;
+       struct sock *sk = &ssk->isk.sk;
+       int credits_before;
+       
+       skb = sdp_recv_completion(ssk, wc->wr_id);
+       if (unlikely(!skb))
+               return NULL;
+
+       atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+       if (unlikely(wc->status)) {
+               if (wc->status != IB_WC_WR_FLUSH_ERR) {
+                       sdp_warn(sk, "Recv completion with error. Status %d\n",
+                               wc->status);
+                       sdp_reset(sk);
+               }
+               __kfree_skb(skb);
+               return NULL;
+       }
+
+       sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
+                       (int)wc->wr_id, wc->byte_len);
+       if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
+               printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
+                               wc->byte_len, sizeof(struct sdp_bsdh));
+               __kfree_skb(skb);
+               return NULL;
+       }
+       skb->len = wc->byte_len;
+       if (likely(wc->byte_len > SDP_HEAD_SIZE))
+               skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
+       else
+               skb->data_len = 0;
+       skb->data = skb->head;
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+       skb->tail = skb_headlen(skb);
+#else
+       skb->tail = skb->head + skb_headlen(skb);
+#endif
+       h = (struct sdp_bsdh *)skb->data;
+       SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
+       skb_reset_transport_header(skb);
+       atomic_set(&ssk->mseq_ack, ntohl(h->mseq));
+       if (mseq_ack(ssk) != (int)wc->wr_id)
+               printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
+                               mseq_ack(ssk), (int)wc->wr_id);
+
+       SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
+
+       credits_before = tx_credits(ssk);
+       atomic_set(&ssk->tx_ring.credits, ntohl(h->mseq_ack) - ring_head(ssk->tx_ring) + 1 +
+               ntohs(h->bufs));
+
+       sdp_prf(&ssk->isk.sk, skb, "RX %s bufs=%d c before:%d after:%d "
+               "mseq:%d, ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before, 
+               tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+
+       return skb;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+       struct sock *sk = &ssk->isk.sk;
+       struct socket *sock = sk->sk_socket;
+
+       if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+               sdp_prf(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d",
+                               tx_credits(ssk), ssk->min_bufs,
+                               ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
+       }
+
+       if (tx_credits(ssk) >= ssk->min_bufs &&
+           ring_head(ssk->tx_ring) == ring_tail(ssk->tx_ring) &&
+          sock != NULL) {
+               clear_bit(SOCK_NOSPACE, &sock->flags);
+
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+                       wake_up_interruptible(sk->sk_sleep);
+               if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+                       sock_wake_async(sock, 2, POLL_OUT);
+       }
+}
+
+/* only from interrupt.
+ * drain rx cq into rx_backlog queue */
+static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+{
+       struct ib_cq *cq = ssk->rx_ring.cq;
        struct ib_wc ibwc[SDP_NUM_WC];
        int n, i;
-       int ret = -EAGAIN;
        int wc_processed = 0;
+       struct sk_buff *skb;
+
+       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
 
        do {
                n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
@@ -527,102 +589,224 @@ int sdp_poll_rx_cq(struct sdp_sock *ssk)
                        struct ib_wc *wc = &ibwc[i];
 
                        BUG_ON(!(wc->wr_id & SDP_OP_RECV));
-                       sdp_process_rx_wc(ssk, wc);
+                       skb = sdp_process_rx_wc(ssk, wc);
+                       if (!skb)
+                               continue;
+                       skb_queue_tail(&ssk->rx_backlog, skb);
                        wc_processed++;
-
-                       if (credit_update_needed(ssk, wc_processed)) {
-                               sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d",
-                                               ssk->remote_credits,
-                                               ssk->rx_head - ssk->rx_tail,
-                                               wc_processed);
-                               sdp_post_recvs(ssk);
-                               if (sdp_post_credits(ssk))
-                                       wc_processed = 0;
-                       }
-
-                       ret = 0;
                }
        } while (n == SDP_NUM_WC);
 
-       if (!ret) {
-               struct sock *sk = &ssk->isk.sk;
+       if (wc_processed)
+               sdp_bzcopy_write_space(ssk);
 
-               sdp_post_recvs(ssk);
+       return wc_processed;
+}
 
-               /* update credits */
-               sdp_post_sends(ssk, 0);
+int sdp_process_rx_q(struct sdp_sock *ssk)
+{
+       struct sk_buff *skb;
+       struct sock *sk = &ssk->isk.sk;
+       unsigned long flags;
 
-               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-                       sk_stream_write_space(&ssk->isk.sk);
-       } else {
+       if (!ssk->rx_backlog.next || !ssk->rx_backlog.prev) {
+               sdp_warn(&ssk->isk.sk, "polling a zeroed rx_backlog!!!! %p\n", &ssk->rx_backlog);
+               return 0;
+       }
+
+       if (skb_queue_empty(&ssk->rx_backlog)) {
                SDPSTATS_COUNTER_INC(rx_poll_miss);
+               return -EAGAIN;
        }
 
-       return ret;
+       /* update credits */
+       sdp_post_sends(ssk, 0);
+
+       spin_lock_irqsave(&ssk->rx_backlog.lock, flags);
+       while ((skb = __skb_dequeue(&ssk->rx_backlog))) {
+               sdp_process_rx_skb(ssk, skb);
+       }
+       spin_unlock_irqrestore(&ssk->rx_backlog.lock, flags);
+
+       if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+               sk_stream_write_space(&ssk->isk.sk);
+
+       return 0;
 }
 
-void sdp_rx_comp_work(struct work_struct *work)
+static void sdp_rx_comp_work(struct work_struct *work)
 {
        struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
        struct sock *sk = &ssk->isk.sk;
        struct ib_cq *rx_cq;
 
        lock_sock(sk);
-       rx_cq = ssk->rx_cq;
+       rx_cq = ssk->rx_ring.cq;
        if (unlikely(!rx_cq))
                goto out;
 
        if (unlikely(!ssk->poll_cq)) {
                struct rdma_cm_id *id = ssk->id;
+               sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
                if (id && id->qp)
                        rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
                goto out;
        }
 
-       sdp_poll_rx_cq(ssk);
+       sdp_process_rx_q(ssk);
        sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
        release_sock(sk);
        sk_stream_mem_reclaim(sk);
        lock_sock(sk);
-       rx_cq = ssk->rx_cq;
+       rx_cq = ssk->rx_ring.cq;
        if (unlikely(!rx_cq))
                goto out;
        
-       sdp_arm_rx_cq(sk);
-       sdp_poll_rx_cq(ssk);
+       sdp_process_rx_q(ssk);
        sdp_xmit_poll(ssk,  1);
+
 out:
        release_sock(sk);
 }
 
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 {
        struct sock *sk = cq_context;
        struct sdp_sock *ssk = sdp_sk(sk);
+       unsigned long flags;
+       int wc_processed = 0;
 
-       WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+       sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
 
-       if (!ssk->rx_cq)
-               sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+       WARN_ON(cq != ssk->rx_ring.cq);
 
        SDPSTATS_COUNTER_INC(rx_int_count);
 
-       sdp_prf(sk, NULL, "rx completion");
+       sdp_prf(sk, NULL, "rx irq");
+
+       rx_ring_lock(ssk, flags);
+
+       if (unlikely(!ssk->poll_cq))
+               sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
+
+       if (!ssk->rx_ring.cq) {
+               sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+               goto out;
+       }
+
+       wc_processed = sdp_poll_rx_cq(ssk);
+       sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
+
+       if (wc_processed) {
+               _sdp_post_recvs(ssk);
+
+               /* Best was to send credit update from here */
+/*             sdp_post_credits(ssk); */
 
-       /* issue sdp_rx_comp_work() */
-       queue_work(rx_comp_wq, &ssk->rx_comp_work);
+               /* issue sdp_rx_comp_work() */
+               queue_work(rx_comp_wq, &ssk->rx_comp_work);
+       }
+
+       sdp_arm_rx_cq(sk);
+
+out:
+       rx_ring_unlock(ssk, flags);
 }
 
-void sdp_rx_ring_purge(struct sdp_sock *ssk)
+static void sdp_rx_ring_purge(struct sdp_sock *ssk)
 {
-       struct sk_buff *skb;
+       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
 
-       while (ssk->rx_head != ssk->rx_tail) {
+       while (ring_posted(ssk->rx_ring) > 0) {
                struct sk_buff *skb;
-               skb = sdp_recv_completion(ssk, ssk->rx_tail);
+               skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
                if (!skb)
                        break;
                atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
                __kfree_skb(skb);
        }
 }
+
+void sdp_rx_ring_init(struct sdp_sock *ssk)
+{
+       ssk->rx_ring.buffer = NULL;
+       spin_lock_init(&ssk->rx_ring.lock);
+}
+
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+       struct ib_cq *rx_cq;
+       int rc = 0;
+       unsigned long flags;
+
+       rx_ring_lock(ssk, flags);
+
+       atomic_set(&ssk->rx_ring.head, 1);
+       atomic_set(&ssk->rx_ring.tail, 1);
+
+       ssk->rx_ring.buffer = kmalloc(sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE,
+                                     GFP_KERNEL);
+       if (!ssk->rx_ring.buffer) {
+               rc = -ENOMEM;
+               sdp_warn(&ssk->isk.sk, "Unable to allocate RX Ring size %zd.\n",
+                        sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
+
+               goto out;
+       }
+
+       rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
+                         &ssk->isk.sk, SDP_RX_SIZE, 0);
+
+       if (IS_ERR(rx_cq)) {
+               rc = PTR_ERR(rx_cq);
+               sdp_warn(&ssk->isk.sk, "Unable to allocate RX CQ: %d.\n", rc);
+               goto err_cq;
+       }
+
+       rc = ib_modify_cq(rx_cq, 10, 200);
+       if (rc) {
+               sdp_warn(&ssk->isk.sk, "Unable to modify RX CQ: %d.\n", rc);
+               goto err_mod;
+       }
+       sdp_warn(&ssk->isk.sk, "Initialized CQ moderation\n");
+       sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
+
+       INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
+
+       sdp_arm_rx_cq(&ssk->isk.sk);
+
+       goto out;
+
+err_mod:
+       ib_destroy_cq(rx_cq);
+err_cq:
+       kfree(ssk->rx_ring.buffer);
+       ssk->rx_ring.buffer = NULL;
+out:
+       rx_ring_unlock(ssk, flags);
+       return rc;
+}
+
+void sdp_rx_ring_destroy(struct sdp_sock *ssk)
+{
+       WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+       if (ssk->rx_ring.buffer) {
+               sdp_rx_ring_purge(ssk);
+
+               kfree(ssk->rx_ring.buffer);
+               ssk->rx_ring.buffer = NULL;
+       }
+
+       if (ssk->rx_ring.cq) {
+               ib_destroy_cq(ssk->rx_ring.cq);
+               ssk->rx_ring.cq = NULL;
+       }
+
+       WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
+}
index 5e6a2dc28bf5ae71c659768b2c6ef1d16430789e..de9d7920b8398d21fd1930c26195a64fe27498a1 100644 (file)
@@ -55,8 +55,11 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force)
                mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
 
        /* Poll the CQ every SDP_TX_POLL_MODER packets */
-       if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+       if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) {
                wc_processed = sdp_process_tx_cq(ssk);
+               sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed,
+                               ring_posted(ssk->tx_ring));
+       }
 
        return wc_processed;    
 }
@@ -65,24 +68,16 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 {
        struct sdp_buf *tx_req;
        struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
-       unsigned mseq = ssk->tx_ring.head;
+       unsigned mseq = ring_head(ssk->tx_ring);
        int i, rc, frags;
        u64 addr;
        struct ib_device *dev;
-       struct ib_sge *sge;
        struct ib_send_wr *bad_wr;
 
-#define ENUM2STR(e) [e] = #e
-       static char *mid2str[] = {
-               ENUM2STR(SDP_MID_HELLO),
-               ENUM2STR(SDP_MID_HELLO_ACK),
-               ENUM2STR(SDP_MID_DISCONN),
-               ENUM2STR(SDP_MID_CHRCVBUF),
-               ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-               ENUM2STR(SDP_MID_DATA),
-       };
-       sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d",
-                       mid2str[mid], ssk->rx_head - ssk->rx_tail);
+       struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+       struct ib_sge *sge = ibsge;
+       struct ib_send_wr tx_wr = { 0 };
+
 
        SDPSTATS_COUNTER_MID_INC(post_send, mid);
        SDPSTATS_HIST(send_size, skb->len);
@@ -93,16 +88,19 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
        else
                h->flags = 0;
 
-       h->bufs = htons(ssk->rx_head - ssk->rx_tail);
+       h->bufs = htons(ring_posted(ssk->rx_ring));
        h->len = htonl(skb->len);
        h->mseq = htonl(mseq);
-       h->mseq_ack = htonl(ssk->mseq_ack);
+       h->mseq_ack = htonl(mseq_ack(ssk));
+
+       sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%d ack:%d",
+                       mid2str(mid), ring_posted(ssk->rx_ring), mseq, ntohl(h->mseq_ack));
 
        SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
+
        tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
        tx_req->skb = skb;
        dev = ssk->ib_device;
-       sge = ssk->ibsge;
        addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
                                 DMA_TO_DEVICE);
        tx_req->mapping[0] = addr;
@@ -127,14 +125,14 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
                sge->lkey = ssk->mr->lkey;
        }
 
-       ssk->tx_wr.next = NULL;
-       ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND;
-       ssk->tx_wr.sg_list = ssk->ibsge;
-       ssk->tx_wr.num_sge = frags + 1;
-       ssk->tx_wr.opcode = IB_WR_SEND;
-       ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
+       tx_wr.next = NULL;
+       tx_wr.wr_id = ring_head(ssk->tx_ring) | SDP_OP_SEND;
+       tx_wr.sg_list = ibsge;
+       tx_wr.num_sge = frags + 1;
+       tx_wr.opcode = IB_WR_SEND;
+       tx_wr.send_flags = IB_SEND_SIGNALED;
        if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG))
-               ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
+               tx_wr.send_flags |= IB_SEND_SOLICITED;
        
        {
                static unsigned long last_send = 0;
@@ -145,19 +143,15 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 
                last_send = jiffies;
        }
-       rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
-       ++ssk->tx_ring.head;
-       --ssk->tx_ring.credits;
-       ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
+       rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
+       atomic_inc(&ssk->tx_ring.head);
+       atomic_dec(&ssk->tx_ring.credits);
+       atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
        if (unlikely(rc)) {
                sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
                sdp_set_error(&ssk->isk.sk, -ECONNRESET);
                wake_up(&ssk->wq);
        }
-
-       if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) {
-               sdp_poll_rx_cq(ssk);
-       }
 }
 
 static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
@@ -168,9 +162,9 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
        struct bzcopy_state *bz;
        int i, frags;
        struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
-       if (unlikely(mseq != tx_ring->tail)) {
+       if (unlikely(mseq != ring_tail(*tx_ring))) {
                printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
-                       mseq, tx_ring->tail);
+                       mseq, ring_tail(*tx_ring));
                goto out;
        }
 
@@ -193,7 +187,7 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
        if (bz)
                bz->busy--;
 
-       ++tx_ring->tail;
+       atomic_inc(&tx_ring->tail);
 
 out:
        return skb;
@@ -219,7 +213,11 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
                }
        }
 
-       sdp_prf(&ssk->isk.sk, skb, "tx completion");
+       {
+               struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+               sdp_prf(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
+       }
+
        sk_stream_free_skb(&ssk->isk.sk, skb);
 
        return 0;
@@ -281,15 +279,14 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk)
        return wc_processed;    
 }
 
-void sdp_poll_tx_cq(unsigned long data)
+static void sdp_poll_tx_timeout(unsigned long data)
 {
        struct sdp_sock *ssk = (struct sdp_sock *)data;
        struct sock *sk = &ssk->isk.sk;
        u32 inflight, wc_processed;
 
-
        sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
-               (u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+               (u32) ring_posted(ssk->tx_ring));
 
        /* Only process if the socket is not in use */
        bh_lock_sock(sk);
@@ -303,12 +300,14 @@ void sdp_poll_tx_cq(unsigned long data)
                goto out;
 
        wc_processed = sdp_process_tx_cq(ssk);
+       sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed,
+               ring_posted(ssk->tx_ring));
        if (!wc_processed)
                SDPSTATS_COUNTER_INC(tx_poll_miss);
        else
                SDPSTATS_COUNTER_INC(tx_poll_hit);
 
-       inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+       inflight = (u32) ring_posted(ssk->tx_ring);
 
        /* If there are still packets in flight and the timer has not already
         * been scheduled by the Tx routine then schedule it here to guarantee
@@ -322,17 +321,7 @@ out:
        bh_unlock_sock(sk);
 }
 
-void _sdp_poll_tx_cq(unsigned long data)
-{
-       struct sdp_sock *ssk = (struct sdp_sock *)data;
-       struct sock *sk = &ssk->isk.sk;
-
-       sdp_prf(sk, NULL, "sdp poll tx timeout expired");
-
-       sdp_poll_tx_cq(data);
-}
-
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
+static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 {
        struct sock *sk = cq_context;
        struct sdp_sock *ssk = sdp_sk(sk);
@@ -344,11 +333,9 @@ void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 
 void sdp_tx_ring_purge(struct sdp_sock *ssk)
 {
-       struct sk_buff *skb;
-
-       while (ssk->tx_ring.head != ssk->tx_ring.tail) {
+       while (ring_posted(ssk->tx_ring)) {
                struct sk_buff *skb;
-               skb = sdp_send_completion(ssk, ssk->tx_ring.tail);
+               skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
                if (!skb)
                        break;
                __kfree_skb(skb);
@@ -380,3 +367,66 @@ void sdp_post_keepalive(struct sdp_sock *ssk)
        sdp_cnt(sdp_keepalive_probes_sent);
 }
 
+static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+       struct ib_cq *tx_cq;
+       int rc = 0;
+
+       atomic_set(&ssk->tx_ring.head, 1);
+       atomic_set(&ssk->tx_ring.tail, 1);
+
+       ssk->tx_ring.buffer = kmalloc(sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE,
+                                     GFP_KERNEL);
+       if (!ssk->tx_ring.buffer) {
+               rc = -ENOMEM;
+               sdp_warn(&ssk->isk.sk, "Unable to allocate TX Ring size %zd.\n",
+                        sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
+
+               goto out;
+       }
+
+       tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
+                         &ssk->isk.sk, SDP_TX_SIZE, 0);
+
+       if (IS_ERR(tx_cq)) {
+               rc = PTR_ERR(tx_cq);
+               sdp_warn(&ssk->isk.sk, "Unable to allocate TX CQ: %d.\n", rc);
+               goto err_cq;
+       }
+
+       sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
+
+       init_timer(&ssk->tx_ring.timer);
+       ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
+       ssk->tx_ring.timer.data = (unsigned long) ssk;
+       ssk->tx_ring.poll_cnt = 0;
+
+       return 0;
+
+err_cq:
+       kfree(ssk->tx_ring.buffer);
+       ssk->tx_ring.buffer = NULL;
+out:
+       return rc;
+}
+
+void sdp_tx_ring_destroy(struct sdp_sock *ssk)
+{
+       if (ssk->tx_ring.buffer) {
+               sdp_tx_ring_purge(ssk);
+
+               kfree(ssk->tx_ring.buffer);
+               ssk->tx_ring.buffer = NULL;
+       }
+
+       if (ssk->tx_ring.cq) {
+               ib_destroy_cq(ssk->tx_ring.cq);
+               ssk->tx_ring.cq = NULL;
+       }
+
+       WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
+}