]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: created sdp_rx and sdp_tx
authorAmir Vadai <amirv@mellanox.co.il>
Sun, 26 Apr 2009 12:31:27 +0000 (15:31 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:28 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/Makefile
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_rx.c [new file with mode: 0644]
drivers/infiniband/ulp/sdp/sdp_tx.c [new file with mode: 0644]

index 5da4b7bbcd4b0d6ffdef8f86a95b276decfe7000..b14a16a4407a772c19d23d1b59aeb8aaca7455b1 100644 (file)
@@ -3,4 +3,4 @@ EXTRA_CFLAGS += -ggdb
 
 obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o
 
-ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o
+ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o
index 8da329fc96046591d4bc86b1716e33550cbc9183..5e782820099c7da9032d9f2488cd6a1d78f49f81 100644 (file)
@@ -222,6 +222,7 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define MIN(a, b) (a < b ? a : b)
 #endif
 
+extern struct workqueue_struct *sdp_wq;
 extern struct list_head sock_list;
 extern spinlock_t sock_list_lock;
 
@@ -318,6 +319,10 @@ static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
        return SDP_TX_SIZE - (tx_ring->head - tx_ring->tail);
 }
 
+struct sdp_chrecvbuf {
+       u32 size;
+};
+
 struct sdp_sock {
        /* sk has to be the first member of inet_sock */
        struct inet_sock isk;
@@ -509,36 +514,38 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
 int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
 void sdp_reset(struct sock *sk);
 void sdp_reset_sk(struct sock *sk, int rc);
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
-void sdp_poll_tx_cq(unsigned long data);
-void _sdp_poll_tx_cq(unsigned long data);
-void sdp_rx_comp_work(struct work_struct *work);
-void sdp_process_tx_wc_work(struct work_struct *work);
 int sdp_post_credits(struct sdp_sock *ssk);
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
-void sdp_post_recvs(struct sdp_sock *ssk);
-int sdp_poll_rx_cq(struct sdp_sock *ssk);
-void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle);
-#define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle)
 void sdp_destroy_work(struct work_struct *work);
 void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
 void sdp_dreq_wait_timeout_work(struct work_struct *work);
-struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id);
-struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq);
 void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb);
 void sdp_add_sock(struct sdp_sock *ssk);
 void sdp_remove_sock(struct sdp_sock *ssk);
 void sdp_remove_large_sock(struct sdp_sock *ssk);
-int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
-int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
 void sdp_post_keepalive(struct sdp_sock *ssk);
 void sdp_start_keepalive_timer(struct sock *sk);
 void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 int sdp_init_sock(struct sock *sk);
 int __init sdp_proc_init(void);
 void sdp_proc_unregister(void);
+
 int sdp_xmit_poll(struct sdp_sock *ssk, int force);
+void sdp_tx_ring_purge(struct sdp_sock *ssk);
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
+void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle);
+#define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle)
+void sdp_process_tx_wc_work(struct work_struct *work);
+void sdp_poll_tx_cq(unsigned long data);
+void _sdp_poll_tx_cq(unsigned long data);
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
+
+int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
+int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
+void sdp_rx_ring_purge(struct sdp_sock *ssk);
+void sdp_post_recvs(struct sdp_sock *ssk);
+void sdp_rx_comp_work(struct work_struct *work);
+int sdp_poll_rx_cq(struct sdp_sock *ssk);
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
 
 static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
 {
index 6d432378aeb4dd13cb1bb7215196f4cfd6a4ff48..5d4441ae8ecc764d658d934830b08303ea4628a0 100644 (file)
 
 #define SDP_RESIZE_WAIT 16
 
-struct sdp_chrecvbuf {
-       u32 size;
-};
-
-static int rcvbuf_scale = 0x10;
-
-int rcvbuf_initial_size = SDP_HEAD_SIZE;
-module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
-MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
-
-module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644);
-MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor.");
-
-static int top_mem_usage = 0;
-module_param_named(top_mem_usage, top_mem_usage, int, 0644);
-MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB).");
-
-#ifdef CONFIG_PPC
-static int max_large_sockets = 100;
-#else
-static int max_large_sockets = 1000;
-#endif
-module_param_named(max_large_sockets, max_large_sockets, int, 0644);
-MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers).");
-
-#define sdp_cnt(var) do { (var)++; } while (0)
-static unsigned sdp_keepalive_probes_sent = 0;
-
-module_param_named(sdp_keepalive_probes_sent, sdp_keepalive_probes_sent, uint, 0644);
-MODULE_PARM_DESC(sdp_keepalive_probes_sent, "Total number of keepalive probes sent.");
-
-static int curr_large_sockets = 0;
-atomic_t sdp_current_mem_usage;
-spinlock_t sdp_large_sockets_lock;
-
-static int sdp_get_large_socket(struct sdp_sock *ssk)
-{
-       int count, ret;
-
-       if (ssk->recv_request)
-               return 1;
-
-       spin_lock_irq(&sdp_large_sockets_lock);
-       count = curr_large_sockets;
-       ret = curr_large_sockets < max_large_sockets;
-       if (ret)
-               curr_large_sockets++;
-       spin_unlock_irq(&sdp_large_sockets_lock);
-
-       return ret;
-}
-
-void sdp_remove_large_sock(struct sdp_sock *ssk)
-{
-       if (ssk->recv_frags) {
-               spin_lock_irq(&sdp_large_sockets_lock);
-               curr_large_sockets--;
-               spin_unlock_irq(&sdp_large_sockets_lock);
-       }
-}
-
-/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
-static void sdp_fin(struct sock *sk)
-{
-       sdp_dbg(sk, "%s\n", __func__);
-
-       sk->sk_shutdown |= RCV_SHUTDOWN;
-       sock_set_flag(sk, SOCK_DONE);
-
-       switch (sk->sk_state) {
-       case TCP_SYN_RECV:
-       case TCP_ESTABLISHED:
-               sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED,
-                               TCP_CLOSE_WAIT);
-               break;
-
-       case TCP_FIN_WAIT1:
-               /* Received a reply FIN - start Infiniband tear down */
-               sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n",
-                               __func__);
-
-               sdp_cancel_dreq_wait_timeout(sdp_sk(sk));
-
-               sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
-
-               if (sdp_sk(sk)->id) {
-                       rdma_disconnect(sdp_sk(sk)->id);
-               } else {
-                       sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
-                       return;
-               }
-               break;
-       case TCP_TIME_WAIT:
-               /* This is a mutual close situation and we've got the DREQ from
-                  the peer before the SDP_MID_DISCONNECT */
-               break;
-       case TCP_CLOSE:
-               /* FIN arrived after IB teardown started - do nothing */
-               sdp_dbg(sk, "%s: fin in state %s\n",
-                               __func__, sdp_state_str(sk->sk_state));
-               return;
-       default:
-               sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n",
-                               __func__, sk->sk_state);
-               break;
-       }
-
-
-       sk_mem_reclaim(sk);
-
-       if (!sock_flag(sk, SOCK_DEAD)) {
-               sk->sk_state_change(sk);
-
-               /* Do not send POLL_HUP for half duplex close. */
-               if (sk->sk_shutdown == SHUTDOWN_MASK ||
-                   sk->sk_state == TCP_CLOSE)
-                       sk_wake_async(sk, 1, POLL_HUP);
-               else
-                       sk_wake_async(sk, 1, POLL_IN);
-       }
-}
-
-void sdp_post_keepalive(struct sdp_sock *ssk)
-{
-       int rc;
-       struct ib_send_wr wr, *bad_wr;
-
-       sdp_dbg(&ssk->isk.sk, "%s\n", __func__);
-
-       memset(&wr, 0, sizeof(wr));
-
-       wr.next    = NULL;
-       wr.wr_id   = 0;
-       wr.sg_list = NULL;
-       wr.num_sge = 0;
-       wr.opcode  = IB_WR_RDMA_WRITE;
-
-       rc = ib_post_send(ssk->qp, &wr, &bad_wr);
-       if (rc) {
-               sdp_dbg(&ssk->isk.sk, "ib_post_keepalive failed with status %d.\n", rc);
-               sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-               wake_up(&ssk->wq);
-       }
-
-       sdp_cnt(sdp_keepalive_probes_sent);
-}
-
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
 void _dump_packet(const char *func, int line, struct sock *sk, char *str,
                struct sk_buff *skb, const struct sdp_bsdh *h)
@@ -245,336 +98,6 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
 }
 #endif
 
-static int sdp_process_tx_cq(struct sdp_sock *ssk);
-
-int sdp_xmit_poll(struct sdp_sock *ssk, int force)
-{
-       int wc_processed = 0;
-//     sdp_prf(&ssk->isk.sk, NULL, "xmit_poll force = %d", force);
-
-       /* If we don't have a pending timer, set one up to catch our recent
-          post in case the interface becomes idle */
-       if (!timer_pending(&ssk->tx_ring.timer))
-               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
-
-       /* Poll the CQ every SDP_TX_POLL_MODER packets */
-       if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
-               wc_processed = sdp_process_tx_cq(ssk);
-
-       return wc_processed;    
-}
-
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
-{
-       struct sdp_buf *tx_req;
-       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
-       unsigned mseq = ssk->tx_ring.head;
-       int i, rc, frags;
-       u64 addr;
-       struct ib_device *dev;
-       struct ib_sge *sge;
-       struct ib_send_wr *bad_wr;
-
-#define ENUM2STR(e) [e] = #e
-       static char *mid2str[] = {
-               ENUM2STR(SDP_MID_HELLO),
-               ENUM2STR(SDP_MID_HELLO_ACK),
-               ENUM2STR(SDP_MID_DISCONN),
-               ENUM2STR(SDP_MID_CHRCVBUF),
-               ENUM2STR(SDP_MID_CHRCVBUF_ACK),
-               ENUM2STR(SDP_MID_DATA),
-       };
-       sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d",
-                       mid2str[mid], ssk->rx_head - ssk->rx_tail);
-
-       SDPSTATS_COUNTER_MID_INC(post_send, mid);
-       SDPSTATS_HIST(send_size, skb->len);
-
-       h->mid = mid;
-       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
-               h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
-       else
-               h->flags = 0;
-
-       h->bufs = htons(ssk->rx_head - ssk->rx_tail);
-       h->len = htonl(skb->len);
-       h->mseq = htonl(mseq);
-       h->mseq_ack = htonl(ssk->mseq_ack);
-
-       SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
-       tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
-       tx_req->skb = skb;
-       dev = ssk->ib_device;
-       sge = ssk->ibsge;
-       addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
-                                DMA_TO_DEVICE);
-       tx_req->mapping[0] = addr;
-
-       /* TODO: proper error handling */
-       BUG_ON(ib_dma_mapping_error(dev, addr));
-
-       sge->addr = addr;
-       sge->length = skb->len - skb->data_len;
-       sge->lkey = ssk->mr->lkey;
-       frags = skb_shinfo(skb)->nr_frags;
-       for (i = 0; i < frags; ++i) {
-               ++sge;
-               addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
-                                      skb_shinfo(skb)->frags[i].page_offset,
-                                      skb_shinfo(skb)->frags[i].size,
-                                      DMA_TO_DEVICE);
-               BUG_ON(ib_dma_mapping_error(dev, addr));
-               tx_req->mapping[i + 1] = addr;
-               sge->addr = addr;
-               sge->length = skb_shinfo(skb)->frags[i].size;
-               sge->lkey = ssk->mr->lkey;
-       }
-
-       ssk->tx_wr.next = NULL;
-       ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND;
-       ssk->tx_wr.sg_list = ssk->ibsge;
-       ssk->tx_wr.num_sge = frags + 1;
-       ssk->tx_wr.opcode = IB_WR_SEND;
-       ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
-       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
-               ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
-       
-       {
-               static unsigned long last_send = 0;
-               int delta = jiffies - last_send;
-               
-               if (likely(last_send)) 
-                       SDPSTATS_HIST(send_interval, delta);
-
-               last_send = jiffies;
-       }
-       rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
-       ++ssk->tx_ring.head;
-       --ssk->tx_ring.credits;
-       ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
-       if (unlikely(rc)) {
-               sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
-               sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-               wake_up(&ssk->wq);
-       }
-
-       if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) {
-               sdp_poll_rx_cq(ssk);
-       }
-}
-
-struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
-{
-       struct ib_device *dev;
-       struct sdp_buf *tx_req;
-       struct sk_buff *skb = NULL;
-       struct bzcopy_state *bz;
-       int i, frags;
-       struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
-       if (unlikely(mseq != tx_ring->tail)) {
-               printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
-                       mseq, tx_ring->tail);
-               goto out;
-       }
-
-       dev = ssk->ib_device;
-        tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
-       skb = tx_req->skb;
-       ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len,
-                           DMA_TO_DEVICE);
-       frags = skb_shinfo(skb)->nr_frags;
-       for (i = 0; i < frags; ++i) {
-               ib_dma_unmap_page(dev, tx_req->mapping[i + 1],
-                                 skb_shinfo(skb)->frags[i].size,
-                                 DMA_TO_DEVICE);
-       }
-
-       tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq;
-
-       /* TODO: AIO and real zcopy cdoe; add their context support here */
-       bz = BZCOPY_STATE(skb);
-       if (bz)
-               bz->busy--;
-
-       ++tx_ring->tail;
-
-out:
-       return skb;
-}
-
-static void sdp_post_recv(struct sdp_sock *ssk)
-{
-       struct sdp_buf *rx_req;
-       int i, rc, frags;
-       u64 addr;
-       struct ib_device *dev;
-       struct ib_sge *sge;
-       struct ib_recv_wr *bad_wr;
-       struct sk_buff *skb;
-       struct page *page;
-       skb_frag_t *frag;
-       struct sdp_bsdh *h;
-       int id = ssk->rx_head;
-       gfp_t gfp_page;
-
-       /* Now, allocate and repost recv */
-       /* TODO: allocate from cache */
-
-       if (unlikely(ssk->isk.sk.sk_allocation)) {
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
-                                         ssk->isk.sk.sk_allocation);
-               gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM;
-       } else {
-               skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
-                                         GFP_KERNEL);
-               gfp_page = GFP_HIGHUSER;
-       }
-
-       /* FIXME */
-       BUG_ON(!skb);
-       h = (struct sdp_bsdh *)skb->head;
-       for (i = 0; i < ssk->recv_frags; ++i) {
-               page = alloc_pages(gfp_page, 0);
-               BUG_ON(!page);
-               frag = &skb_shinfo(skb)->frags[i];
-               frag->page                = page;
-               frag->page_offset         = 0;
-
-               /* Bugzilla 1311 */
-               if ( sizeof(frag->size) < 4 )
-                       frag->size = min(PAGE_SIZE, SDP_MAX_PAYLOAD);
-               else
-                       frag->size = PAGE_SIZE;
-
-               ++skb_shinfo(skb)->nr_frags;
-               skb->len += frag->size;
-               skb->data_len += frag->size;
-               skb->truesize += frag->size;
-       }
-
-        rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
-       rx_req->skb = skb;
-       dev = ssk->ib_device;
-       sge = ssk->ibsge;
-       addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
-       BUG_ON(ib_dma_mapping_error(dev, addr));
-
-       rx_req->mapping[0] = addr;
-
-       /* TODO: proper error handling */
-       sge->addr = (u64)addr;
-       sge->length = SDP_HEAD_SIZE;
-       sge->lkey = ssk->mr->lkey;
-       frags = skb_shinfo(skb)->nr_frags;
-       for (i = 0; i < frags; ++i) {
-               ++sge;
-               addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
-                                      skb_shinfo(skb)->frags[i].page_offset,
-                                      skb_shinfo(skb)->frags[i].size,
-                                      DMA_FROM_DEVICE);
-               BUG_ON(ib_dma_mapping_error(dev, addr));
-               rx_req->mapping[i + 1] = addr;
-               sge->addr = addr;
-               sge->length = skb_shinfo(skb)->frags[i].size;
-               sge->lkey = ssk->mr->lkey;
-       }
-
-       ssk->rx_wr.next = NULL;
-       ssk->rx_wr.wr_id = id | SDP_OP_RECV;
-       ssk->rx_wr.sg_list = ssk->ibsge;
-       ssk->rx_wr.num_sge = frags + 1;
-       rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
-       sdp_prf(&ssk->isk.sk, skb, "rx skb was posted");
-       SDPSTATS_COUNTER_INC(post_recv);
-       ++ssk->rx_head;
-       if (unlikely(rc)) {
-               sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
-               sdp_reset(&ssk->isk.sk);
-       }
-
-       atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-}
-
-void sdp_post_recvs(struct sdp_sock *ssk)
-{
-       struct sock *sk = &ssk->isk.sk;
-       int scale = ssk->rcvbuf_scale;
-
-       if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
-               (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
-               return;
-       }
-
-       if (top_mem_usage &&
-           (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
-               scale = 1;
-
-       while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
-               (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) *
-               (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
-               ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
-              unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS))
-               sdp_post_recv(ssk);
-}
-
-struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
-{
-       struct sdp_buf *rx_req;
-       struct ib_device *dev;
-       struct sk_buff *skb;
-       int i, frags;
-
-       if (unlikely(id != ssk->rx_tail)) {
-               printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
-                       id, ssk->rx_tail);
-               return NULL;
-       }
-
-       dev = ssk->ib_device;
-        rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
-       skb = rx_req->skb;
-       ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
-                           DMA_FROM_DEVICE);
-       frags = skb_shinfo(skb)->nr_frags;
-       for (i = 0; i < frags; ++i)
-               ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
-                                 skb_shinfo(skb)->frags[i].size,
-                                 DMA_FROM_DEVICE);
-       ++ssk->rx_tail;
-       --ssk->remote_credits;
-       return skb;
-}
-
-/* Here because I do not want queue to fail. */
-static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
-                                                    struct sk_buff *skb)
-{
-       int skb_len;
-       struct sdp_sock *ssk = sdp_sk(sk);
-       struct sk_buff *tail = NULL;
-
-       /* not needed since sk_rmem_alloc is not currently used
-        * TODO - remove this?
-       skb_set_owner_r(skb, sk); */
-
-       skb_len = skb->len;
-
-       TCP_SKB_CB(skb)->seq = ssk->rcv_nxt;
-       ssk->rcv_nxt += skb_len;
-
-       if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) &&
-           unlikely(skb_tailroom(tail) >= skb_len)) {
-               skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len);
-               __kfree_skb(skb);
-               skb = tail;
-       } else
-               skb_queue_tail(&sk->sk_receive_queue, skb);
-
-       if (!sock_flag(sk, SOCK_DEAD))
-               sk->sk_data_ready(sk, skb_len);
-       return skb;
-}
-
 static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
 {
        struct page *page;
@@ -771,456 +294,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
        }
 }
 
-int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
-{
-       ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
-       if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
-               ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
-       ssk->rcvbuf_scale = rcvbuf_scale;
-
-       sdp_post_recvs(ssk);
-
-       return 0;
-}
-
-int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
-{
-       skb_frag_t skb_frag;
-       u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
-       u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
-
-       /* Bugzilla 1311, Kernels using smaller fragments must reject
-        * re-size requests larger than 32k to prevent being sent
-        * fragment larger than the receive buffer fragment.
-        */
-       if ( (sizeof(skb_frag.size) < 4) && (max_size > 0x8000))
-               max_size = 0x8000;
-
-       if (new_size > curr_size && new_size <= max_size &&
-           sdp_get_large_socket(ssk)) {
-               ssk->rcvbuf_scale = rcvbuf_scale;
-               ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
-               if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
-                       ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
-               return 0;
-       } else
-               return -1;
-}
-
-static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
-{
-       if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
-               ssk->recv_request_head = ssk->rx_head + 1;
-       else
-               ssk->recv_request_head = ssk->rx_tail;
-       ssk->recv_request = 1;
-}
-
-static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
-{
-       u32 new_size = ntohl(buf->size);
-
-       if (new_size > ssk->xmit_size_goal) {
-               ssk->sent_request = -1;
-               ssk->xmit_size_goal = new_size;
-               ssk->send_frags =
-                       PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE;
-       } else
-               ssk->sent_request = 0;
-}
-
-static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc)
-{
-       struct sock *sk = &ssk->isk.sk;
-       int frags;
-       struct sk_buff *skb;
-       struct sdp_bsdh *h;
-       int pagesz, i;
-
-       skb = sdp_recv_completion(ssk, wc->wr_id);
-       if (unlikely(!skb))
-               return -1;
-
-       sdp_prf(sk, skb, "recv completion");    
-
-       atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-
-       if (unlikely(wc->status)) {
-               if (wc->status != IB_WC_WR_FLUSH_ERR) {
-                       sdp_dbg(sk, "Recv completion with error. Status %d\n",
-                               wc->status);
-                       sdp_reset(sk);
-               }
-               __kfree_skb(skb);
-               return 0;
-       }
-
-       sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
-                       (int)wc->wr_id, wc->byte_len);
-       if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
-               printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
-                               wc->byte_len, sizeof(struct sdp_bsdh));
-               __kfree_skb(skb);
-               return -1;
-       }
-       skb->len = wc->byte_len;
-       if (likely(wc->byte_len > SDP_HEAD_SIZE))
-               skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
-       else
-               skb->data_len = 0;
-       skb->data = skb->head;
-#ifdef NET_SKBUFF_DATA_USES_OFFSET
-       skb->tail = skb_headlen(skb);
-#else
-       skb->tail = skb->head + skb_headlen(skb);
-#endif
-       h = (struct sdp_bsdh *)skb->data;
-       SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
-       skb_reset_transport_header(skb);
-       ssk->mseq_ack = ntohl(h->mseq);
-       if (ssk->mseq_ack != (int)wc->wr_id)
-               printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
-                               ssk->mseq_ack, (int)wc->wr_id);
-
-       SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits);
-       ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 +
-               ntohs(h->bufs);
-
-       frags = skb_shinfo(skb)->nr_frags;
-       pagesz = PAGE_ALIGN(skb->data_len);
-       skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
-
-       for (i = skb_shinfo(skb)->nr_frags;
-                       i < frags; ++i) {
-               put_page(skb_shinfo(skb)->frags[i].page);
-               skb->truesize -= PAGE_SIZE;
-       }
-
-       if (unlikely(h->flags & SDP_OOB_PEND))
-               sk_send_sigurg(sk);
-
-       skb_pull(skb, sizeof(struct sdp_bsdh));
-
-       switch (h->mid) {
-       case SDP_MID_DATA:
-               if (unlikely(skb->len <= 0)) {
-                       __kfree_skb(skb);
-                       break;
-               }
-
-               if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
-                       /* got data in RCV_SHUTDOWN */
-                       if (sk->sk_state == TCP_FIN_WAIT1) {
-                               /* go into abortive close */
-                               sdp_exch_state(sk, TCPF_FIN_WAIT1,
-                                              TCP_TIME_WAIT);
-
-                               sk->sk_prot->disconnect(sk, 0);
-                       }
-
-                       __kfree_skb(skb);
-                       break;
-               }
-               skb = sdp_sock_queue_rcv_skb(sk, skb);
-               if (unlikely(h->flags & SDP_OOB_PRES))
-                       sdp_urg(ssk, skb);
-               break;
-       case SDP_MID_DISCONN:
-               __kfree_skb(skb);
-               sdp_fin(sk);
-               break;
-       case SDP_MID_CHRCVBUF:
-               sdp_handle_resize_request(ssk,
-                       (struct sdp_chrecvbuf *)skb->data);
-               __kfree_skb(skb);
-               break;
-       case SDP_MID_CHRCVBUF_ACK:
-               sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data);
-               __kfree_skb(skb);
-               break;
-       default:
-               /* TODO: Handle other messages */
-               printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid);
-               __kfree_skb(skb);
-       }
-
-       return 0;
-}
-
-static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
-{
-       struct sk_buff *skb = NULL;
-
-       skb = sdp_send_completion(ssk, wc->wr_id);
-       if (unlikely(!skb))
-               return -1;
-
-       if (unlikely(wc->status)) {
-               if (wc->status != IB_WC_WR_FLUSH_ERR) {
-                       struct sock *sk = &ssk->isk.sk;
-                       sdp_warn(sk, "Send completion with error. "
-                               "Status %d\n", wc->status);
-                       sdp_set_error(sk, -ECONNRESET);
-                       wake_up(&ssk->wq);
-
-                       queue_work(rx_comp_wq, &ssk->destroy_work);
-               }
-       }
-
-       sdp_prf(&ssk->isk.sk, skb, "tx completion");
-       sk_stream_free_skb(&ssk->isk.sk, skb);
-
-       return 0;
-}
-
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
-{
-       struct sock *sk = cq_context;
-       struct sdp_sock *ssk = sdp_sk(sk);
-
-       WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
-
-       if (!ssk->rx_cq)
-               sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
-
-       SDPSTATS_COUNTER_INC(rx_int_count);
-
-       sdp_prf(sk, NULL, "rx completion");
-
-       /* issue sdp_rx_comp_work() */
-       queue_work(rx_comp_wq, &ssk->rx_comp_work);
-}
-
-static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
-{
-       if (likely(wc->wr_id & SDP_OP_SEND)) {
-               sdp_handle_send_comp(ssk, wc);
-               return;
-       }
-
-       sk_wmem_free_skb(&ssk->isk.sk, skb);
-
-       /* Keepalive probe sent cleanup */
-       sdp_cnt(sdp_keepalive_probes_sent);
-
-       if (likely(!wc->status))
-               return;
-
-       sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
-                       __func__, wc->status);
-
-       if (wc->status == IB_WC_WR_FLUSH_ERR)
-               return;
-
-       sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-       wake_up(&ssk->wq);
-}
-
-static int sdp_process_tx_cq(struct sdp_sock *ssk)
-{
-       struct ib_wc ibwc[SDP_NUM_WC];
-       int n, i;
-       int wc_processed = 0;
-
-       if (!ssk->tx_ring.cq) {
-               sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n");
-               return 0;
-       }
-       
-       do {
-               n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
-               for (i = 0; i < n; ++i) {
-                       sdp_process_tx_wc(ssk, ibwc + i);
-                       wc_processed++;
-               }
-       } while (n == SDP_NUM_WC);
-
-       sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
-
-       if (wc_processed) {
-               struct sock *sk = &ssk->isk.sk;
-               sdp_post_sends(ssk, 0);
-
-               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-                       sk_stream_write_space(&ssk->isk.sk);
-
-       }
-
-       return wc_processed;    
-}
-
-void sdp_poll_tx_cq(unsigned long data)
-{
-       struct sdp_sock *ssk = (struct sdp_sock *)data;
-       struct sock *sk = &ssk->isk.sk;
-       u32 inflight, wc_processed;
-
-
-       sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
-               (u32) ssk->tx_ring.head - ssk->tx_ring.tail);
-
-       /* Only process if the socket is not in use */
-       bh_lock_sock(sk);
-       if (sock_owned_by_user(sk)) {
-               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
-               sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
-               goto out;
-       }
-
-       if (sk->sk_state == TCP_CLOSE)
-               goto out;
-
-       wc_processed = sdp_process_tx_cq(ssk);
-       if (!wc_processed)
-               SDPSTATS_COUNTER_INC(tx_poll_miss);
-       else
-               SDPSTATS_COUNTER_INC(tx_poll_hit);
-
-       inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
-
-       /* If there are still packets in flight and the timer has not already
-        * been scheduled by the Tx routine then schedule it here to guarantee
-        * completion processing of these packets */
-       if (inflight) { /* TODO: make sure socket is not closed */
-               sdp_dbg_data(sk, "arming timer for more polling\n");
-               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
-       }
-
-out:
-       bh_unlock_sock(sk);
-}
-
-void _sdp_poll_tx_cq(unsigned long data)
-{
-       struct sdp_sock *ssk = (struct sdp_sock *)data;
-       struct sock *sk = &ssk->isk.sk;
-
-       sdp_prf(sk, NULL, "sdp poll tx timeout expired");
-
-       sdp_poll_tx_cq(data);
-}
-
-
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
-{
-       struct sock *sk = cq_context;
-       struct sdp_sock *ssk = sdp_sk(sk);
-
-       sdp_warn(sk, "Got tx comp interrupt\n");
-
-       mod_timer(&ssk->tx_ring.timer, jiffies + 1);
-}
-
-static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed)
-{
-       int c;
-
-       c = ssk->remote_credits;
-       if (likely(c > SDP_MIN_TX_CREDITS))
-               c += c/2;
-
-/*     sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d "
-                       "tx ring slots left: %d send_head: %p\n",
-               ssk->tx_ring.credits, ssk->remote_credits,
-               sdp_tx_ring_slots_left(&ssk->tx_ring),
-               ssk->isk.sk.sk_send_head);
-*/
-       return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) &&
-           likely(ssk->tx_ring.credits > 1) &&
-           likely(sdp_tx_ring_slots_left(&ssk->tx_ring)));
-}
-
-
-int sdp_poll_rx_cq(struct sdp_sock *ssk)
-{
-       struct ib_cq *cq = ssk->rx_cq;
-       struct ib_wc ibwc[SDP_NUM_WC];
-       int n, i;
-       int ret = -EAGAIN;
-       int updated_credits = 0;
-       int wc_processed = 0;
-
-       do {
-               n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
-               for (i = 0; i < n; ++i) {
-                       struct ib_wc *wc = &ibwc[i];
-
-                       BUG_ON(!(wc->wr_id & SDP_OP_RECV));
-                       sdp_handle_recv_comp(ssk, wc);
-                       wc_processed++;
-
-/*                     if (!updated_credits) {
-                               sdp_post_recvs(ssk);
-                               sdp_post_sends(ssk, 0);
-                               updated_credits = 1;
-                       }*/
-//sdp_warn(&ssk->isk.sk, "i = %d, wc_processed = %d wr_id = 0x%llx\n", i, wc_processed, wc->wr_id);
-                       if (credit_update_needed(ssk, wc_processed)) {
-                               sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d",
-                                               ssk->remote_credits,
-                                               ssk->rx_head - ssk->rx_tail,
-                                               wc_processed);
-                               sdp_post_recvs(ssk);
-                               if (sdp_post_credits(ssk))
-                                       wc_processed = 0;
-                       }
-
-                       ret = 0;
-               }
-       } while (n == SDP_NUM_WC);
-
-       if (!ret) {
-               struct sock *sk = &ssk->isk.sk;
-
-               sdp_post_recvs(ssk);
-
-               /* update credits */
-               sdp_post_sends(ssk, 0);
-
-               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-                       sk_stream_write_space(&ssk->isk.sk);
-       } else {
-               SDPSTATS_COUNTER_INC(rx_poll_miss);
-       }
-
-       return ret;
-}
-
 static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
 {
        return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
 }
-
-void sdp_rx_comp_work(struct work_struct *work)
-{
-       struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
-       struct sock *sk = &ssk->isk.sk;
-       struct ib_cq *rx_cq;
-
-       lock_sock(sk);
-       rx_cq = ssk->rx_cq;
-       if (unlikely(!rx_cq))
-               goto out;
-
-       if (unlikely(!ssk->poll_cq)) {
-               struct rdma_cm_id *id = ssk->id;
-               if (id && id->qp)
-                       rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
-               goto out;
-       }
-
-       sdp_poll_rx_cq(ssk);
-       sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
-       release_sock(sk);
-       sk_mem_reclaim(sk);
-       lock_sock(sk);
-       rx_cq = ssk->rx_cq;
-       if (unlikely(!rx_cq))
-               goto out;
-       
-       sdp_arm_rx_cq(sk);
-       sdp_poll_rx_cq(ssk);
-       sdp_xmit_poll(ssk,  1);
-out:
-       release_sock(sk);
-}
index 8cd739a05f37cd8ee5a04911c81b8e0a9dcf3c8b..b5322da653220666744bc7510897817436749976 100644 (file)
@@ -220,21 +220,8 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
                ib_destroy_qp(ssk->qp);
                ssk->qp = NULL;
 
-               while (ssk->rx_head != ssk->rx_tail) {
-                       struct sk_buff *skb;
-                       skb = sdp_recv_completion(ssk, ssk->rx_tail);
-                       if (!skb)
-                               break;
-                       atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-                       __kfree_skb(skb);
-               }
-               while (ssk->tx_ring.head != ssk->tx_ring.tail) {
-                       struct sk_buff *skb;
-                       skb = sdp_send_completion(ssk, ssk->tx_ring.tail);
-                       if (!skb)
-                               break;
-                       __kfree_skb(skb);
-               }
+               sdp_rx_ring_purge(ssk);
+               sdp_tx_ring_purge(ssk);
        }
 
        if (tx_cq) {
diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c
new file mode 100644 (file)
index 0000000..ba8e0fe
--- /dev/null
@@ -0,0 +1,628 @@
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * $Id$
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+static int rcvbuf_scale = 0x10;
+
+int rcvbuf_initial_size = SDP_HEAD_SIZE;
+module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
+MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
+
+module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644);
+MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor.");
+
+static int top_mem_usage = 0;
+module_param_named(top_mem_usage, top_mem_usage, int, 0644);
+MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB).");
+
+#ifdef CONFIG_PPC
+static int max_large_sockets = 100;
+#else
+static int max_large_sockets = 1000;
+#endif
+module_param_named(max_large_sockets, max_large_sockets, int, 0644);
+MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers).");
+
+static int curr_large_sockets = 0;
+atomic_t sdp_current_mem_usage;
+spinlock_t sdp_large_sockets_lock;
+
+static int sdp_get_large_socket(struct sdp_sock *ssk)
+{
+       int count, ret;
+
+       if (ssk->recv_request)
+               return 1;
+
+       spin_lock_irq(&sdp_large_sockets_lock);
+       count = curr_large_sockets;
+       ret = curr_large_sockets < max_large_sockets;
+       if (ret)
+               curr_large_sockets++;
+       spin_unlock_irq(&sdp_large_sockets_lock);
+
+       return ret;
+}
+
+void sdp_remove_large_sock(struct sdp_sock *ssk)
+{
+       if (ssk->recv_frags) {
+               spin_lock_irq(&sdp_large_sockets_lock);
+               curr_large_sockets--;
+               spin_unlock_irq(&sdp_large_sockets_lock);
+       }
+}
+
+/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
+static void sdp_fin(struct sock *sk)
+{
+       sdp_dbg(sk, "%s\n", __func__);
+
+       sk->sk_shutdown |= RCV_SHUTDOWN;
+       sock_set_flag(sk, SOCK_DONE);
+
+       switch (sk->sk_state) {
+       case TCP_SYN_RECV:
+       case TCP_ESTABLISHED:
+               sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED,
+                               TCP_CLOSE_WAIT);
+               break;
+
+       case TCP_FIN_WAIT1:
+               /* Received a reply FIN - start Infiniband tear down */
+               sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n",
+                               __func__);
+
+               sdp_cancel_dreq_wait_timeout(sdp_sk(sk));
+
+               sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
+
+               if (sdp_sk(sk)->id) {
+                       rdma_disconnect(sdp_sk(sk)->id);
+               } else {
+                       sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
+                       return;
+               }
+               break;
+       case TCP_TIME_WAIT:
+               /* This is a mutual close situation and we've got the DREQ from
+                  the peer before the SDP_MID_DISCONNECT */
+               break;
+       case TCP_CLOSE:
+               /* FIN arrived after IB teardown started - do nothing */
+               sdp_dbg(sk, "%s: fin in state %s\n",
+                               __func__, sdp_state_str(sk->sk_state));
+               return;
+       default:
+               sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n",
+                               __func__, sk->sk_state);
+               break;
+       }
+
+
+       sk_stream_mem_reclaim(sk);
+
+       if (!sock_flag(sk, SOCK_DEAD)) {
+               sk->sk_state_change(sk);
+
+               /* Do not send POLL_HUP for half duplex close. */
+               if (sk->sk_shutdown == SHUTDOWN_MASK ||
+                   sk->sk_state == TCP_CLOSE)
+                       sk_wake_async(sk, 1, POLL_HUP);
+               else
+                       sk_wake_async(sk, 1, POLL_IN);
+       }
+}
+
+
+static void sdp_post_recv(struct sdp_sock *ssk)
+{
+       struct sdp_buf *rx_req;
+       int i, rc, frags;
+       u64 addr;
+       struct ib_device *dev;
+       struct ib_sge *sge;
+       struct ib_recv_wr *bad_wr;
+       struct sk_buff *skb;
+       struct page *page;
+       skb_frag_t *frag;
+       struct sdp_bsdh *h;
+       int id = ssk->rx_head;
+       gfp_t gfp_page;
+
+       /* Now, allocate and repost recv */
+       /* TODO: allocate from cache */
+
+       if (unlikely(ssk->isk.sk.sk_allocation)) {
+               skb = sk_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
+                                         ssk->isk.sk.sk_allocation);
+               gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM;
+       } else {
+               skb = sk_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
+                                         GFP_KERNEL);
+               gfp_page = GFP_HIGHUSER;
+       }
+
+       /* FIXME */
+       BUG_ON(!skb);
+       h = (struct sdp_bsdh *)skb->head;
+       for (i = 0; i < ssk->recv_frags; ++i) {
+               page = alloc_pages(gfp_page, 0);
+               BUG_ON(!page);
+               frag = &skb_shinfo(skb)->frags[i];
+               frag->page                = page;
+               frag->page_offset         = 0;
+               frag->size                =  min(PAGE_SIZE, SDP_MAX_PAYLOAD);
+               ++skb_shinfo(skb)->nr_frags;
+               skb->len += frag->size;
+               skb->data_len += frag->size;
+               skb->truesize += frag->size;
+       }
+
+        rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
+       rx_req->skb = skb;
+       dev = ssk->ib_device;
+       sge = ssk->ibsge;
+       addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
+       BUG_ON(ib_dma_mapping_error(dev, addr));
+
+       rx_req->mapping[0] = addr;
+
+       /* TODO: proper error handling */
+       sge->addr = (u64)addr;
+       sge->length = SDP_HEAD_SIZE;
+       sge->lkey = ssk->mr->lkey;
+       frags = skb_shinfo(skb)->nr_frags;
+       for (i = 0; i < frags; ++i) {
+               ++sge;
+               addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
+                                      skb_shinfo(skb)->frags[i].page_offset,
+                                      skb_shinfo(skb)->frags[i].size,
+                                      DMA_FROM_DEVICE);
+               BUG_ON(ib_dma_mapping_error(dev, addr));
+               rx_req->mapping[i + 1] = addr;
+               sge->addr = addr;
+               sge->length = skb_shinfo(skb)->frags[i].size;
+               sge->lkey = ssk->mr->lkey;
+       }
+
+       ssk->rx_wr.next = NULL;
+       ssk->rx_wr.wr_id = id | SDP_OP_RECV;
+       ssk->rx_wr.sg_list = ssk->ibsge;
+       ssk->rx_wr.num_sge = frags + 1;
+       rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
+       sdp_prf(&ssk->isk.sk, skb, "rx skb was posted");
+       SDPSTATS_COUNTER_INC(post_recv);
+       ++ssk->rx_head;
+       if (unlikely(rc)) {
+               sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
+               sdp_reset(&ssk->isk.sk);
+       }
+
+       atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+}
+
+void sdp_post_recvs(struct sdp_sock *ssk)
+{
+       struct sock *sk = &ssk->isk.sk;
+       int scale = ssk->rcvbuf_scale;
+
+       if (unlikely(!ssk->id || ((1 << sk->sk_state) & 
+               (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
+               return;
+       }
+
+       if (top_mem_usage &&
+           (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
+               scale = 1;
+
+       while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
+               (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) *
+               (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
+               ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
+              unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS))
+               sdp_post_recv(ssk);
+}
+
+static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
+                                                    struct sk_buff *skb)
+{
+       int skb_len;
+       struct sdp_sock *ssk = sdp_sk(sk);
+       struct sk_buff *tail = NULL;
+
+       /* not needed since sk_rmem_alloc is not currently used
+        * TODO - remove this?
+       skb_set_owner_r(skb, sk); */
+
+       skb_len = skb->len;
+
+       TCP_SKB_CB(skb)->seq = ssk->rcv_nxt;
+       ssk->rcv_nxt += skb_len;
+
+       if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) &&
+           unlikely(skb_tailroom(tail) >= skb_len)) {
+               skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len);
+               __kfree_skb(skb);
+               skb = tail;
+       } else
+               skb_queue_tail(&sk->sk_receive_queue, skb);
+
+       if (!sock_flag(sk, SOCK_DEAD))
+               sk->sk_data_ready(sk, skb_len);
+       return skb;
+}
+
+int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
+{
+       ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
+       if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
+               ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
+       ssk->rcvbuf_scale = rcvbuf_scale;
+
+       sdp_post_recvs(ssk);
+
+       return 0;
+}
+
+int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
+{
+       u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
+#if defined(__ia64__)
+       /* for huge PAGE_SIZE systems, aka IA64, limit buffers size
+          [re-]negotiation to a known+working size that will not
+          trigger a HW error/rc to be interpreted as a IB_WC_LOC_LEN_ERR */
+       u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE) <=
+               32784 ?
+               (SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE): 32784;
+#else 
+       u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
+#endif
+
+       if (new_size > curr_size && new_size <= max_size &&
+           sdp_get_large_socket(ssk)) {
+               ssk->rcvbuf_scale = rcvbuf_scale;
+               ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
+               if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
+                       ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
+               return 0;
+       } else
+               return -1;
+}
+
+static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
+{
+       if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
+               ssk->recv_request_head = ssk->rx_head + 1;
+       else
+               ssk->recv_request_head = ssk->rx_tail;
+       ssk->recv_request = 1;
+}
+
+static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
+{
+       u32 new_size = ntohl(buf->size);
+
+       if (new_size > ssk->xmit_size_goal) {
+               ssk->sent_request = -1;
+               ssk->xmit_size_goal = new_size;
+               ssk->send_frags =
+                       PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE;
+       } else
+               ssk->sent_request = 0;
+}
+
+static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed)
+{
+       int c;
+
+       c = ssk->remote_credits;
+       if (likely(c > SDP_MIN_TX_CREDITS))
+               c += c/2;
+
+/*     sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d "
+                       "tx ring slots left: %d send_head: %p\n",
+               ssk->tx_ring.credits, ssk->remote_credits,
+               sdp_tx_ring_slots_left(&ssk->tx_ring),
+               ssk->isk.sk.sk_send_head);
+*/
+       return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) &&
+           likely(ssk->tx_ring.credits > 1) &&
+           likely(sdp_tx_ring_slots_left(&ssk->tx_ring)));
+}
+
+
+static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
+{
+       struct sdp_buf *rx_req;
+       struct ib_device *dev;
+       struct sk_buff *skb;
+       int i, frags;
+
+       if (unlikely(id != ssk->rx_tail)) {
+               printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
+                       id, ssk->rx_tail);
+               return NULL;
+       }
+
+       dev = ssk->ib_device;
+        rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
+       skb = rx_req->skb;
+       ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
+                           DMA_FROM_DEVICE);
+       frags = skb_shinfo(skb)->nr_frags;
+       for (i = 0; i < frags; ++i)
+               ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
+                                 skb_shinfo(skb)->frags[i].size,
+                                 DMA_FROM_DEVICE);
+       ++ssk->rx_tail;
+       --ssk->remote_credits;
+       return skb;
+}
+
+static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+       struct sock *sk = &ssk->isk.sk;
+       int frags;
+       struct sk_buff *skb;
+       struct sdp_bsdh *h;
+       int pagesz, i;
+
+       skb = sdp_recv_completion(ssk, wc->wr_id);
+       if (unlikely(!skb))
+               return -1;
+
+       sdp_prf(sk, skb, "recv completion");    
+
+       atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+       if (unlikely(wc->status)) {
+               if (wc->status != IB_WC_WR_FLUSH_ERR) {
+                       sdp_dbg(sk, "Recv completion with error. Status %d\n",
+                               wc->status);
+                       sdp_reset(sk);
+               }
+               __kfree_skb(skb);
+               return 0;
+       }
+
+       sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
+                       (int)wc->wr_id, wc->byte_len);
+       if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
+               printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
+                               wc->byte_len, sizeof(struct sdp_bsdh));
+               __kfree_skb(skb);
+               return -1;
+       }
+       skb->len = wc->byte_len;
+       if (likely(wc->byte_len > SDP_HEAD_SIZE))
+               skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
+       else
+               skb->data_len = 0;
+       skb->data = skb->head;
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+       skb->tail = skb_headlen(skb);
+#else
+       skb->tail = skb->head + skb_headlen(skb);
+#endif
+       h = (struct sdp_bsdh *)skb->data;
+       SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
+       skb_reset_transport_header(skb);
+       ssk->mseq_ack = ntohl(h->mseq);
+       if (ssk->mseq_ack != (int)wc->wr_id)
+               printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
+                               ssk->mseq_ack, (int)wc->wr_id);
+
+       SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits);
+       ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 +
+               ntohs(h->bufs);
+
+       frags = skb_shinfo(skb)->nr_frags;
+       pagesz = PAGE_ALIGN(skb->data_len);
+       skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
+
+       for (i = skb_shinfo(skb)->nr_frags;
+                       i < frags; ++i) {
+               put_page(skb_shinfo(skb)->frags[i].page);
+               skb->truesize -= PAGE_SIZE;
+       }
+
+       if (unlikely(h->flags & SDP_OOB_PEND))
+               sk_send_sigurg(sk);
+
+       skb_pull(skb, sizeof(struct sdp_bsdh));
+
+       switch (h->mid) {
+       case SDP_MID_DATA:
+               if (unlikely(skb->len <= 0)) {
+                       __kfree_skb(skb);
+                       break;
+               }
+
+               if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
+                       /* got data in RCV_SHUTDOWN */
+                       if (sk->sk_state == TCP_FIN_WAIT1) {
+                               /* go into abortive close */
+                               sdp_exch_state(sk, TCPF_FIN_WAIT1,
+                                              TCP_TIME_WAIT);
+
+                               sk->sk_prot->disconnect(sk, 0);
+                       }
+
+                       __kfree_skb(skb);
+                       break;
+               }
+               skb = sdp_sock_queue_rcv_skb(sk, skb);
+               if (unlikely(h->flags & SDP_OOB_PRES))
+                       sdp_urg(ssk, skb);
+               break;
+       case SDP_MID_DISCONN:
+               __kfree_skb(skb);
+               sdp_fin(sk);
+               break;
+       case SDP_MID_CHRCVBUF:
+               sdp_handle_resize_request(ssk,
+                       (struct sdp_chrecvbuf *)skb->data);
+               __kfree_skb(skb);
+               break;
+       case SDP_MID_CHRCVBUF_ACK:
+               sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data);
+               __kfree_skb(skb);
+               break;
+       default:
+               /* TODO: Handle other messages */
+               printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid);
+               __kfree_skb(skb);
+       }
+
+       return 0;
+}
+
+int sdp_poll_rx_cq(struct sdp_sock *ssk)
+{
+       struct ib_cq *cq = ssk->rx_cq;
+       struct ib_wc ibwc[SDP_NUM_WC];
+       int n, i;
+       int ret = -EAGAIN;
+       int wc_processed = 0;
+
+       do {
+               n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
+               for (i = 0; i < n; ++i) {
+                       struct ib_wc *wc = &ibwc[i];
+
+                       BUG_ON(!(wc->wr_id & SDP_OP_RECV));
+                       sdp_process_rx_wc(ssk, wc);
+                       wc_processed++;
+
+                       if (credit_update_needed(ssk, wc_processed)) {
+                               sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d",
+                                               ssk->remote_credits,
+                                               ssk->rx_head - ssk->rx_tail,
+                                               wc_processed);
+                               sdp_post_recvs(ssk);
+                               if (sdp_post_credits(ssk))
+                                       wc_processed = 0;
+                       }
+
+                       ret = 0;
+               }
+       } while (n == SDP_NUM_WC);
+
+       if (!ret) {
+               struct sock *sk = &ssk->isk.sk;
+
+               sdp_post_recvs(ssk);
+
+               /* update credits */
+               sdp_post_sends(ssk, 0);
+
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+                       sk_stream_write_space(&ssk->isk.sk);
+       } else {
+               SDPSTATS_COUNTER_INC(rx_poll_miss);
+       }
+
+       return ret;
+}
+
+void sdp_rx_comp_work(struct work_struct *work)
+{
+       struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
+       struct sock *sk = &ssk->isk.sk;
+       struct ib_cq *rx_cq;
+
+       lock_sock(sk);
+       rx_cq = ssk->rx_cq;
+       if (unlikely(!rx_cq))
+               goto out;
+
+       if (unlikely(!ssk->poll_cq)) {
+               struct rdma_cm_id *id = ssk->id;
+               if (id && id->qp)
+                       rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
+               goto out;
+       }
+
+       sdp_poll_rx_cq(ssk);
+       sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
+       release_sock(sk);
+       sk_stream_mem_reclaim(sk);
+       lock_sock(sk);
+       rx_cq = ssk->rx_cq;
+       if (unlikely(!rx_cq))
+               goto out;
+       
+       sdp_arm_rx_cq(sk);
+       sdp_poll_rx_cq(ssk);
+       sdp_xmit_poll(ssk,  1);
+out:
+       release_sock(sk);
+}
+
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+{
+       struct sock *sk = cq_context;
+       struct sdp_sock *ssk = sdp_sk(sk);
+
+       WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+
+       if (!ssk->rx_cq)
+               sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+       SDPSTATS_COUNTER_INC(rx_int_count);
+
+       sdp_prf(sk, NULL, "rx completion");
+
+       /* issue sdp_rx_comp_work() */
+       queue_work(rx_comp_wq, &ssk->rx_comp_work);
+}
+
+void sdp_rx_ring_purge(struct sdp_sock *ssk)
+{
+       struct sk_buff *skb;
+
+       while (ssk->rx_head != ssk->rx_tail) {
+               struct sk_buff *skb;
+               skb = sdp_recv_completion(ssk, ssk->rx_tail);
+               if (!skb)
+                       break;
+               atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+               __kfree_skb(skb);
+       }
+}
diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c
new file mode 100644 (file)
index 0000000..5e6a2dc
--- /dev/null
@@ -0,0 +1,382 @@
+/*
+ * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ *
+ * $Id$
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+#define sdp_cnt(var) do { (var)++; } while (0)
+static unsigned sdp_keepalive_probes_sent = 0;
+
+module_param_named(sdp_keepalive_probes_sent, sdp_keepalive_probes_sent, uint, 0644);
+MODULE_PARM_DESC(sdp_keepalive_probes_sent, "Total number of keepalive probes sent.");
+
+static int sdp_process_tx_cq(struct sdp_sock *ssk);
+
+int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+{
+       int wc_processed = 0;
+
+       /* If we don't have a pending timer, set one up to catch our recent
+          post in case the interface becomes idle */
+       if (!timer_pending(&ssk->tx_ring.timer))
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+
+       /* Poll the CQ every SDP_TX_POLL_MODER packets */
+       if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+               wc_processed = sdp_process_tx_cq(ssk);
+
+       return wc_processed;    
+}
+
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+{
+       struct sdp_buf *tx_req;
+       struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+       unsigned mseq = ssk->tx_ring.head;
+       int i, rc, frags;
+       u64 addr;
+       struct ib_device *dev;
+       struct ib_sge *sge;
+       struct ib_send_wr *bad_wr;
+
+#define ENUM2STR(e) [e] = #e
+       static char *mid2str[] = {
+               ENUM2STR(SDP_MID_HELLO),
+               ENUM2STR(SDP_MID_HELLO_ACK),
+               ENUM2STR(SDP_MID_DISCONN),
+               ENUM2STR(SDP_MID_CHRCVBUF),
+               ENUM2STR(SDP_MID_CHRCVBUF_ACK),
+               ENUM2STR(SDP_MID_DATA),
+       };
+       sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d",
+                       mid2str[mid], ssk->rx_head - ssk->rx_tail);
+
+       SDPSTATS_COUNTER_MID_INC(post_send, mid);
+       SDPSTATS_HIST(send_size, skb->len);
+
+       h->mid = mid;
+       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG))
+               h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
+       else
+               h->flags = 0;
+
+       h->bufs = htons(ssk->rx_head - ssk->rx_tail);
+       h->len = htonl(skb->len);
+       h->mseq = htonl(mseq);
+       h->mseq_ack = htonl(ssk->mseq_ack);
+
+       SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
+       tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
+       tx_req->skb = skb;
+       dev = ssk->ib_device;
+       sge = ssk->ibsge;
+       addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
+                                DMA_TO_DEVICE);
+       tx_req->mapping[0] = addr;
+
+       /* TODO: proper error handling */
+       BUG_ON(ib_dma_mapping_error(dev, addr));
+
+       sge->addr = addr;
+       sge->length = skb->len - skb->data_len;
+       sge->lkey = ssk->mr->lkey;
+       frags = skb_shinfo(skb)->nr_frags;
+       for (i = 0; i < frags; ++i) {
+               ++sge;
+               addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
+                                      skb_shinfo(skb)->frags[i].page_offset,
+                                      skb_shinfo(skb)->frags[i].size,
+                                      DMA_TO_DEVICE);
+               BUG_ON(ib_dma_mapping_error(dev, addr));
+               tx_req->mapping[i + 1] = addr;
+               sge->addr = addr;
+               sge->length = skb_shinfo(skb)->frags[i].size;
+               sge->lkey = ssk->mr->lkey;
+       }
+
+       ssk->tx_wr.next = NULL;
+       ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND;
+       ssk->tx_wr.sg_list = ssk->ibsge;
+       ssk->tx_wr.num_sge = frags + 1;
+       ssk->tx_wr.opcode = IB_WR_SEND;
+       ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
+       if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG))
+               ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
+       
+       {
+               static unsigned long last_send = 0;
+               int delta = jiffies - last_send;
+               
+               if (likely(last_send)) 
+                       SDPSTATS_HIST(send_interval, delta);
+
+               last_send = jiffies;
+       }
+       rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
+       ++ssk->tx_ring.head;
+       --ssk->tx_ring.credits;
+       ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
+       if (unlikely(rc)) {
+               sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
+               sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+               wake_up(&ssk->wq);
+       }
+
+       if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) {
+               sdp_poll_rx_cq(ssk);
+       }
+}
+
+static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
+{
+       struct ib_device *dev;
+       struct sdp_buf *tx_req;
+       struct sk_buff *skb = NULL;
+       struct bzcopy_state *bz;
+       int i, frags;
+       struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
+       if (unlikely(mseq != tx_ring->tail)) {
+               printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
+                       mseq, tx_ring->tail);
+               goto out;
+       }
+
+       dev = ssk->ib_device;
+        tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
+       skb = tx_req->skb;
+       ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len,
+                           DMA_TO_DEVICE);
+       frags = skb_shinfo(skb)->nr_frags;
+       for (i = 0; i < frags; ++i) {
+               ib_dma_unmap_page(dev, tx_req->mapping[i + 1],
+                                 skb_shinfo(skb)->frags[i].size,
+                                 DMA_TO_DEVICE);
+       }
+
+       tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq;
+
+       /* TODO: AIO and real zcopy code; add their context support here */
+       bz = BZCOPY_STATE(skb);
+       if (bz)
+               bz->busy--;
+
+       ++tx_ring->tail;
+
+out:
+       return skb;
+}
+
+static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+       struct sk_buff *skb = NULL;
+
+       skb = sdp_send_completion(ssk, wc->wr_id);
+       if (unlikely(!skb))
+               return -1;
+
+       if (unlikely(wc->status)) {
+               if (wc->status != IB_WC_WR_FLUSH_ERR) {
+                       struct sock *sk = &ssk->isk.sk;
+                       sdp_warn(sk, "Send completion with error. "
+                               "Status %d\n", wc->status);
+                       sdp_set_error(sk, -ECONNRESET);
+                       wake_up(&ssk->wq);
+
+                       queue_work(sdp_wq, &ssk->destroy_work);
+               }
+       }
+
+       sdp_prf(&ssk->isk.sk, skb, "tx completion");
+       sk_stream_free_skb(&ssk->isk.sk, skb);
+
+       return 0;
+}
+
+static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+       if (likely(wc->wr_id & SDP_OP_SEND)) {
+               sdp_handle_send_comp(ssk, wc);
+               return;
+       }
+
+       /* Keepalive probe sent cleanup */
+       sdp_cnt(sdp_keepalive_probes_sent);
+
+       if (likely(!wc->status))
+               return;
+
+       sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
+                       __func__, wc->status);
+
+       if (wc->status == IB_WC_WR_FLUSH_ERR)
+               return;
+
+       sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+       wake_up(&ssk->wq);
+}
+
+static int sdp_process_tx_cq(struct sdp_sock *ssk)
+{
+       struct ib_wc ibwc[SDP_NUM_WC];
+       int n, i;
+       int wc_processed = 0;
+
+       if (!ssk->tx_ring.cq) {
+               sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n");
+               return 0;
+       }
+       
+       do {
+               n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
+               for (i = 0; i < n; ++i) {
+                       sdp_process_tx_wc(ssk, ibwc + i);
+                       wc_processed++;
+               }
+       } while (n == SDP_NUM_WC);
+
+       sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
+
+       if (wc_processed) {
+               struct sock *sk = &ssk->isk.sk;
+               sdp_post_sends(ssk, 0);
+
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+                       sk_stream_write_space(&ssk->isk.sk);
+
+       }
+
+       return wc_processed;    
+}
+
+void sdp_poll_tx_cq(unsigned long data)
+{
+       struct sdp_sock *ssk = (struct sdp_sock *)data;
+       struct sock *sk = &ssk->isk.sk;
+       u32 inflight, wc_processed;
+
+
+       sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+               (u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+
+       /* Only process if the socket is not in use */
+       bh_lock_sock(sk);
+       if (sock_owned_by_user(sk)) {
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+               sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+               goto out;
+       }
+
+       if (sk->sk_state == TCP_CLOSE)
+               goto out;
+
+       wc_processed = sdp_process_tx_cq(ssk);
+       if (!wc_processed)
+               SDPSTATS_COUNTER_INC(tx_poll_miss);
+       else
+               SDPSTATS_COUNTER_INC(tx_poll_hit);
+
+       inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+
+       /* If there are still packets in flight and the timer has not already
+        * been scheduled by the Tx routine then schedule it here to guarantee
+        * completion processing of these packets */
+       if (inflight) { /* TODO: make sure socket is not closed */
+               sdp_dbg_data(sk, "arming timer for more polling\n");
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+       }
+
+out:
+       bh_unlock_sock(sk);
+}
+
+void _sdp_poll_tx_cq(unsigned long data)
+{
+       struct sdp_sock *ssk = (struct sdp_sock *)data;
+       struct sock *sk = &ssk->isk.sk;
+
+       sdp_prf(sk, NULL, "sdp poll tx timeout expired");
+
+       sdp_poll_tx_cq(data);
+}
+
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
+{
+       struct sock *sk = cq_context;
+       struct sdp_sock *ssk = sdp_sk(sk);
+
+       sdp_warn(sk, "Got tx comp interrupt\n");
+
+       mod_timer(&ssk->tx_ring.timer, jiffies + 1);
+}
+
+void sdp_tx_ring_purge(struct sdp_sock *ssk)
+{
+       struct sk_buff *skb;
+
+       while (ssk->tx_ring.head != ssk->tx_ring.tail) {
+               struct sk_buff *skb;
+               skb = sdp_send_completion(ssk, ssk->tx_ring.tail);
+               if (!skb)
+                       break;
+               __kfree_skb(skb);
+       }
+}
+
+void sdp_post_keepalive(struct sdp_sock *ssk)
+{
+       int rc;
+       struct ib_send_wr wr, *bad_wr;
+
+       sdp_dbg(&ssk->isk.sk, "%s\n", __func__);
+
+       memset(&wr, 0, sizeof(wr));
+
+       wr.next    = NULL;
+       wr.wr_id   = 0;
+       wr.sg_list = NULL;
+       wr.num_sge = 0;
+       wr.opcode  = IB_WR_RDMA_WRITE;
+
+       rc = ib_post_send(ssk->qp, &wr, &bad_wr);
+       if (rc) {
+               sdp_dbg(&ssk->isk.sk, "ib_post_keepalive failed with status %d.\n", rc);
+               sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+               wake_up(&ssk->wq);
+       }
+
+       sdp_cnt(sdp_keepalive_probes_sent);
+}
+