]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: no tx interrupts
authorAmir Vadai <amirv@mellanox.co.il>
Tue, 21 Apr 2009 14:18:37 +0000 (17:18 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:27 +0000 (05:04 -0700)
poll tx cq with timer instead of interrupts

Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_cma.c
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_proc.c

index cbef0eb03736a7a1d58872d273c8ae16fae832df..81280655d3acd99a81272e35bb4fd50c0fddaf42 100644 (file)
@@ -83,12 +83,14 @@ struct sdpstats {
        u32 sendmsg_seglen[25];
        u32 send_size[25];
        u32 post_recv;
-       u32 int_count;
+       u32 rx_int_count;
+       u32 tx_int_count;
        u32 bzcopy_poll_miss;
        u32 send_wait_for_mem;
        u32 send_miss_no_credits;
        u32 rx_poll_miss;
        u32 tx_poll_miss;
+       u32 tx_poll_hit;
        u32 memcpy_count;
        u32 credits_before_update[64];
        u32 send_interval[25];
@@ -132,6 +134,11 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
 #define sock_put(sk, msg)  sock_ref(sk, msg, sock_put)
 #define __sock_put(sk, msg)  sock_ref(sk, msg, __sock_put)
 
+/* Interval between sucessive polls in the Tx routine when polling is used
+   instead of interrupts (in per-core Tx rings) - should be power of 2 */
+#define SDP_TX_POLL_MODER      16
+#define SDP_TX_POLL_TIMEOUT    (HZ / 4)
+
 #define SDP_RESOLVE_TIMEOUT 1000
 #define SDP_ROUTE_TIMEOUT 1000
 #define SDP_RETRY_COUNT 5
@@ -242,7 +249,10 @@ struct sdp_tx_ring {
 
        int               una_seq;
        unsigned          credits;
+
+       struct timer_list timer;
        u16               poll_cnt;
+       struct ib_cq     *cq;
 };
 
 static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
@@ -258,7 +268,7 @@ struct sdp_sock {
        struct list_head backlog_queue;
        struct sock *parent;
 
-       struct work_struct work;
+       struct work_struct rx_comp_work;
        wait_queue_head_t wq;
 
        struct delayed_work dreq_wait_work;
@@ -308,7 +318,7 @@ struct sdp_sock {
 
        /* rdma specific */
        struct ib_qp *qp;
-       struct ib_cq *cq;
+       struct ib_cq *rx_cq;
        struct ib_mr *mr;
 
        struct sdp_buf *rx_ring;
@@ -349,7 +359,7 @@ struct bzcopy_state {
 extern int rcvbuf_initial_size;
 
 extern struct proto sdp_proto;
-extern struct workqueue_struct *comp_wq;
+extern struct workqueue_struct *rx_comp_wq;
 
 extern atomic_t sdp_current_mem_usage;
 extern spinlock_t sdp_large_sockets_lock;
@@ -440,11 +450,11 @@ static inline void sdp_set_error(struct sock *sk, int err)
        sk->sk_error_report(sk);
 }
 
-static inline void sdp_arm_cq(struct sock *sk)
+static inline void sdp_arm_rx_cq(struct sock *sk)
 {
-       sdp_dbg_data(sk, "ib_req_notify_cq on cq\n");
+       sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n");
        
-       ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
+       ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP);
 }
 
 #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
@@ -453,12 +463,15 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s
 int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
 void sdp_reset(struct sock *sk);
 void sdp_reset_sk(struct sock *sk, int rc);
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context);
-void sdp_work(struct work_struct *work);
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
+void sdp_poll_tx_cq(unsigned long data);
+void sdp_rx_comp_work(struct work_struct *work);
+void sdp_process_tx_wc_work(struct work_struct *work);
 int sdp_post_credits(struct sdp_sock *ssk);
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
 void sdp_post_recvs(struct sdp_sock *ssk);
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq);
+int sdp_poll_rx_cq(struct sdp_sock *ssk);
 void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
 void sdp_destroy_work(struct work_struct *work);
 void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
@@ -477,6 +490,7 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk);
 int sdp_init_sock(struct sock *sk);
 int __init sdp_proc_init(void);
 void sdp_proc_unregister(void);
+int sdp_xmit_poll(struct sdp_sock *ssk, int force);
 
 static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
 {
index 9f7f4a07f8913a0a31acd5057bafb66f1231247a..29c9761ec96b198ffda2d9e0ef7c38a692ce6352 100644 (file)
@@ -244,6 +244,24 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s
 }
 #endif
 
+static int sdp_process_tx_cq(struct sdp_sock *ssk);
+
+int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+{
+       int wc_processed = 0;
+
+       /* If we don't have a pending timer, set one up to catch our recent
+          post in case the interface becomes idle */
+       if (!timer_pending(&ssk->tx_ring.timer))
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+
+       /* Poll the CQ every SDP_TX_POLL_MODER packets */
+       if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+               wc_processed = sdp_process_tx_cq(ssk);
+
+       return wc_processed;    
+}
+
 void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
 {
        struct sdp_buf *tx_req;
@@ -573,6 +591,7 @@ int sdp_post_credits(struct sdp_sock *ssk)
                if (!skb)
                        return -ENOMEM;
                sdp_post_send(ssk, skb, SDP_MID_DATA);
+               sdp_xmit_poll(ssk, 0);
        }
        return 0;
 }
@@ -583,6 +602,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
        struct sk_buff *skb;
        int c;
        gfp_t gfp_page;
+       int post_count = 0;
 
        if (unlikely(!ssk->id)) {
                if (ssk->isk.sk.sk_send_head) {
@@ -602,7 +622,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
        if (ssk->recv_request &&
            ssk->rx_tail >= ssk->recv_request_head &&
            ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS &&
-           ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) {
+           sdp_tx_ring_slots_left(&ssk->tx_ring)) {
                struct sdp_chrecvbuf *resp_size;
                ssk->recv_request = 0;
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -614,6 +634,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
                resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
                sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+               post_count++;
        }
 
        if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS &&
@@ -634,12 +655,13 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                update_send_head(&ssk->isk.sk, skb);
                __skb_dequeue(&ssk->isk.sk.sk_write_queue);
                sdp_post_send(ssk, skb, SDP_MID_DATA);
+               post_count++;
        }
 
        if (ssk->tx_ring.credits == SDP_MIN_TX_CREDITS &&
            !ssk->sent_request &&
            ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
-           ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) {
+           sdp_tx_ring_slots_left(&ssk->tx_ring)) {
                struct sdp_chrecvbuf *req_size;
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
                                          sizeof(struct sdp_bsdh) +
@@ -652,6 +674,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
                req_size->size = htonl(ssk->sent_request);
                sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
+               post_count++;
        }
 
        c = ssk->remote_credits;
@@ -660,7 +683,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
 
        if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
            likely(ssk->tx_ring.credits > 1) &&
-           likely(ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) &&
+           likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
            likely((1 << ssk->isk.sk.sk_state) &
                    (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
                skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -670,6 +693,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                BUG_ON(!skb);
                SDPSTATS_COUNTER_INC(post_send_credits);
                sdp_post_send(ssk, skb, SDP_MID_DATA);
+               post_count++;
        }
 
        /* send DisConn if needed
@@ -687,7 +711,10 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
                /* FIXME */
                BUG_ON(!skb);
                sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+               post_count++;
        }
+       if (post_count)
+               sdp_xmit_poll(ssk, 0);
 }
 
 int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
@@ -867,7 +894,6 @@ static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
 {
        struct sk_buff *skb = NULL;
-       struct sdp_bsdh *h;
 
        skb = sdp_send_completion(ssk, wc->wr_id);
        if (unlikely(!skb))
@@ -881,72 +907,162 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
                        sdp_set_error(sk, -ECONNRESET);
                        wake_up(&ssk->wq);
 
-                       queue_work(comp_wq, &ssk->destroy_work);
+                       queue_work(rx_comp_wq, &ssk->destroy_work);
                }
-               goto out;
        }
 
-       h = (struct sdp_bsdh *)skb->data;
+       sk_stream_free_skb(&ssk->isk.sk, skb);
 
-       if (likely(h->mid != SDP_MID_DISCONN))
-               goto out;
+       return 0;
+}
+
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+{
+       struct sock *sk = cq_context;
+       struct sdp_sock *ssk = sdp_sk(sk);
+
+       WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+
+       if (!ssk->rx_cq)
+               sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+       SDPSTATS_COUNTER_INC(rx_int_count);
+
+       queue_work(rx_comp_wq, &ssk->rx_comp_work);
+}
 
-       if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) {
-               sdp_dbg(&ssk->isk.sk,
-                       "%s: sent DISCONNECT from unexpected state %d\n",
-                       __func__, ssk->isk.sk.sk_state);
+static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+       if (likely(wc->wr_id & SDP_OP_SEND)) {
+               sdp_handle_send_comp(ssk, wc);
+               return;
        }
 
-out:
        sk_wmem_free_skb(&ssk->isk.sk, skb);
 
-       return 0;
+       /* Keepalive probe sent cleanup */
+       sdp_cnt(sdp_keepalive_probes_sent);
+
+       if (likely(!wc->status))
+               return;
+
+       sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
+                       __func__, wc->status);
+
+       if (wc->status == IB_WC_WR_FLUSH_ERR)
+               return;
+
+       sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+       wake_up(&ssk->wq);
 }
 
-static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+static int sdp_process_tx_cq(struct sdp_sock *ssk)
 {
-       if (wc->wr_id & SDP_OP_RECV) {
-               if (sdp_handle_recv_comp(ssk, wc))
-                       return;
-       } else if (likely(wc->wr_id & SDP_OP_SEND)) {
-               if (sdp_handle_send_comp(ssk, wc))
-                       return;
-       } else {
-               sdp_cnt(sdp_keepalive_probes_sent);
+       struct ib_wc ibwc[SDP_NUM_WC];
+       int n, i;
+       int wc_processed = 0;
 
-               if (likely(!wc->status))
-                       return;
+       if (!ssk->tx_ring.cq) {
+               sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n");
+               return 0;
+       }
+       
+       do {
+               n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
+               for (i = 0; i < n; ++i) {
+                       sdp_process_tx_wc(ssk, ibwc + i);
+                       wc_processed++;
+               }
+       } while (n == SDP_NUM_WC);
 
-               sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
-                       __func__, wc->status);
+       sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
 
-               if (wc->status == IB_WC_WR_FLUSH_ERR)
-                       return;
+       if (wc_processed) {
+               struct sock *sk = &ssk->isk.sk;
+               sdp_post_sends(ssk, 0);
 
-               sdp_set_error(&ssk->isk.sk, -ECONNRESET);
-               wake_up(&ssk->wq);
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+                       sk_stream_write_space(&ssk->isk.sk);
 
-               return;
        }
+
+       return wc_processed;    
 }
 
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context)
+void sdp_poll_tx_cq(unsigned long data)
+{
+       struct sdp_sock *ssk = (struct sdp_sock *)data;
+       struct sock *sk = &ssk->isk.sk;
+       u32 inflight, wc_processed;
+
+       sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+               (u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+
+       /* Only process if the socket is not in use */
+       bh_lock_sock(sk);
+       if (sock_owned_by_user(sk)) {
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+               sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+               goto out;
+       }
+
+       if (sk->sk_state == TCP_CLOSE)
+               goto out;
+
+       wc_processed = sdp_process_tx_cq(ssk);
+       if (!wc_processed)
+               SDPSTATS_COUNTER_INC(tx_poll_miss);
+       else
+               SDPSTATS_COUNTER_INC(tx_poll_hit);
+
+       inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+
+       /* If there are still packets in flight and the timer has not already
+        * been scheduled by the Tx routine then schedule it here to guarantee
+        * completion processing of these packets */
+       if (inflight) { /* TODO: make sure socket is not closed */
+               sdp_dbg_data(sk, "arming timer for more polling\n");
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+       }
+
+out:
+       bh_unlock_sock(sk);
+}
+
+
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 {
        struct sock *sk = cq_context;
        struct sdp_sock *ssk = sdp_sk(sk);
-       schedule_work(&ssk->work);
-       SDPSTATS_COUNTER_INC(int_count);
+
+       sdp_warn(sk, "Got tx comp interrupt\n");
+
+       mod_timer(&ssk->tx_ring.timer, jiffies + 1);
 }
 
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
+
+int sdp_poll_rx_cq(struct sdp_sock *ssk)
 {
+       struct ib_cq *cq = ssk->rx_cq;
        struct ib_wc ibwc[SDP_NUM_WC];
        int n, i;
        int ret = -EAGAIN;
+       int updated_credits = 0;
+
        do {
                n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
                for (i = 0; i < n; ++i) {
-                       sdp_handle_wc(ssk, &ibwc[i]);
+                       struct ib_wc *wc = &ibwc[i];
+
+                       BUG_ON(!(wc->wr_id & SDP_OP_RECV));
+                       sdp_handle_recv_comp(ssk, wc);
+
+                       if (!updated_credits) {
+                               sdp_post_recvs(ssk);
+                               sdp_post_sends(ssk, 0);
+                               updated_credits = 1;
+                       }
+
                        ret = 0;
                }
        } while (n == SDP_NUM_WC);
@@ -968,17 +1084,20 @@ int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
        return ret;
 }
 
-void sdp_work(struct work_struct *work)
+static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
 {
-       struct sdp_sock *ssk = container_of(work, struct sdp_sock, work);
-       struct sock *sk = &ssk->isk.sk;
-       struct ib_cq *cq;
+       return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
+}
 
-       sdp_dbg_data(sk, "%s\n", __func__);
+void sdp_rx_comp_work(struct work_struct *work)
+{
+       struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
+       struct sock *sk = &ssk->isk.sk;
+       struct ib_cq *rx_cq;
 
        lock_sock(sk);
-       cq = ssk->cq;
-       if (unlikely(!cq))
+       rx_cq = ssk->rx_cq;
+       if (unlikely(!rx_cq))
                goto out;
 
        if (unlikely(!ssk->poll_cq)) {
@@ -988,15 +1107,18 @@ void sdp_work(struct work_struct *work)
                goto out;
        }
 
-       sdp_poll_cq(ssk, cq);
+       sdp_poll_rx_cq(ssk);
+       sdp_xmit_poll(ssk,  1); /* if has pending tx because run out of tx_credits - xmit it */
        release_sock(sk);
        sk_mem_reclaim(sk);
        lock_sock(sk);
-       cq = ssk->cq;
-       if (unlikely(!cq))
+       rx_cq = ssk->rx_cq;
+       if (unlikely(!rx_cq))
                goto out;
-       sdp_arm_cq(sk);
-       sdp_poll_cq(ssk, cq);
+       
+       sdp_arm_rx_cq(sk);
+       sdp_poll_rx_cq(ssk);
+       sdp_xmit_poll(ssk,  1);
 out:
        release_sock(sk);
 }
index 1e1ff9dde5623130f68609841046d5eb5064b250..7c34637fd9a266898bdb12c530809a8234c84fc1 100644 (file)
@@ -73,7 +73,7 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
                .qp_type = IB_QPT_RC,
        };
        struct ib_device *device = id->device;
-       struct ib_cq *cq;
+       struct ib_cq *rx_cq, *tx_cq;
        struct ib_mr *mr;
        struct ib_pd *pd;
        int rc;
@@ -118,32 +118,49 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
         }
 
        sdp_sk(sk)->mr = mr;
-       INIT_WORK(&sdp_sk(sk)->work, sdp_work);
+       INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work);
 
-       cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler,
-                         sk, SDP_TX_SIZE + SDP_RX_SIZE, 0);
+       rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler,
+                         sk, SDP_RX_SIZE, 0);
 
-       if (IS_ERR(cq)) {
-               rc = PTR_ERR(cq);
-               sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc);
-               goto err_cq;
+       if (IS_ERR(rx_cq)) {
+               rc = PTR_ERR(rx_cq);
+               sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc);
+               goto err_rx_cq;
        }
 
-       rc = ib_modify_cq(cq, 10, 200);
+       rc = ib_modify_cq(rx_cq, 10, 200);
        if (rc) {
                sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc);
-               goto err_qp;
+               goto err_tx_cq;
        }
        sdp_warn(sk, "Initialized CQ moderation\n");
+       sdp_sk(sk)->rx_cq = rx_cq;
+       sdp_arm_rx_cq(sk);
+       qp_init_attr.recv_cq = rx_cq;
+
+       tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler,
+                         sk, SDP_TX_SIZE, 0);
+
+       if (IS_ERR(tx_cq)) {
+               rc = PTR_ERR(tx_cq);
+               sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc);
+               goto err_tx_cq;
+       }
 
-        qp_init_attr.send_cq = qp_init_attr.recv_cq = cq;
+       init_timer(&sdp_sk(sk)->tx_ring.timer);
+       sdp_sk(sk)->tx_ring.timer.function = sdp_poll_tx_cq;
+       sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk);
+       sdp_sk(sk)->tx_ring.poll_cnt = 0;
+
+       sdp_sk(sk)->tx_ring.cq = tx_cq;
+        qp_init_attr.send_cq = tx_cq;
 
        rc = rdma_create_qp(id, pd, &qp_init_attr);
        if (rc) {
                sdp_warn(sk, "Unable to create QP: %d.\n", rc);
                goto err_qp;
        }
-       sdp_sk(sk)->cq = cq;
        sdp_sk(sk)->qp = id->qp;
        sdp_sk(sk)->ib_device = device;
 
@@ -153,8 +170,10 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
        return 0;
 
 err_qp:
-       ib_destroy_cq(cq);
-err_cq:
+       ib_destroy_cq(tx_cq);
+err_tx_cq:
+       ib_destroy_cq(rx_cq);
+err_rx_cq:
        ib_dereg_mr(sdp_sk(sk)->mr);
 err_mr:
        ib_dealloc_pd(pd);
@@ -162,7 +181,6 @@ err_pd:
        kfree(sdp_sk(sk)->rx_ring);
        sdp_sk(sk)->rx_ring = NULL;
 err_rx:
-       WARN_ON(sdp_sk(sk)->tx_ring.head != sdp_sk(sk)->tx_ring.tail);
        kfree(sdp_sk(sk)->tx_ring.buffer);
        sdp_sk(sk)->tx_ring.buffer = NULL;
 err_tx:
@@ -258,21 +276,20 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
        sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4;
        sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
                sizeof(struct sdp_bsdh);
-       sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
-               PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
-       sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal, 
-               sdp_sk(sk)->send_frags * PAGE_SIZE);
+       sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
+               PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
+       sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal, 
+               sdp_sk(sk)->send_frags * PAGE_SIZE);
 
-       sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send_frags: %d send trigger %d\n",
-               __func__,
+       sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n",
                sdp_sk(sk)->tx_ring.credits,
                sdp_sk(sk)->xmit_size_goal,
-               sdp_sk(sk)->send_frags,
+               sdp_sk(sk)->send_frags,
                sdp_sk(sk)->min_bufs);
 
        sdp_sk(sk)->poll_cq = 1;
-       sdp_arm_cq(sk);
-       sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq);
+       sdp_arm_rx_cq(sk);
+       sdp_poll_rx_cq(sdp_sk(sk));
 
        sk->sk_state_change(sk);
        sk_wake_async(sk, 0, POLL_OUT);
@@ -332,8 +349,11 @@ static int sdp_disconnected_handler(struct sock *sk)
 
        sdp_dbg(sk, "%s\n", __func__);
 
-       if (ssk->cq)
-               sdp_poll_cq(ssk, ssk->cq);
+       if (ssk->rx_cq)
+               sdp_poll_rx_cq(ssk);
+
+       if (ssk->tx_ring.cq)
+               sdp_xmit_poll(ssk, 1);
 
        if (sk->sk_state == TCP_SYN_RECV) {
                sdp_connected_handler(sk, NULL);
index 6b7494f5ff50e27128629a66cd7445d674f987bb..454abeb2872ab49bd9ed0ed40b20ac8992c80cc0 100644 (file)
@@ -135,7 +135,8 @@ static int sdp_zcopy_thresh = 65536;
 module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
 MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
 
-struct workqueue_struct *comp_wq;
+struct workqueue_struct *sdp_wq;
+struct workqueue_struct *rx_comp_wq;
 
 struct list_head sock_list;
 spinlock_t sock_list_lock;
@@ -205,12 +206,17 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
 static void sdp_destroy_qp(struct sdp_sock *ssk)
 {
        struct ib_pd *pd = NULL;
-       struct ib_cq *cq = NULL;
+       struct ib_cq *rx_cq = NULL;
+       struct ib_cq *tx_cq = NULL;
+
+       del_timer(&ssk->tx_ring.timer);
 
        if (ssk->qp) {
                pd = ssk->qp->pd;
-               cq = ssk->cq;
-               ssk->cq = NULL;
+               rx_cq = ssk->rx_cq;
+               ssk->rx_cq = NULL;
+               tx_cq = ssk->tx_ring.cq;
+               ssk->tx_ring.cq = NULL;
                ib_destroy_qp(ssk->qp);
                ssk->qp = NULL;
 
@@ -231,8 +237,12 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
                }
        }
 
-       if (cq)
-               ib_destroy_cq(cq);
+       if (tx_cq) {
+               ib_destroy_cq(tx_cq);
+       }
+
+       if (rx_cq)
+               ib_destroy_cq(rx_cq);
 
        if (ssk->mr) {
                ib_dereg_mr(ssk->mr);
@@ -341,8 +351,11 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
        read_lock(&device_removal_lock);
 
-       if (ssk->cq)
-               sdp_poll_cq(ssk, ssk->cq);
+       if (ssk->rx_cq)
+               sdp_poll_rx_cq(ssk);
+
+       if (ssk->tx_ring.cq)
+               sdp_xmit_poll(ssk, 1);
 
        if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
                sdp_set_error(sk, rc);
@@ -355,7 +368,7 @@ void sdp_reset_sk(struct sock *sk, int rc)
 
        /* Don't destroy socket before destroy work does its job */
        sock_hold(sk, SOCK_REF_RESET);
-       queue_work(comp_wq, &ssk->destroy_work);
+       queue_work(sdp_wq, &ssk->destroy_work);
 
        read_unlock(&device_removal_lock);
 }
@@ -773,11 +786,10 @@ out:
        release_sock(sk);
        if (newsk) {
                lock_sock(newsk);
-               if (newssk->cq) {
-                       sdp_dbg(newsk, "%s: ib_req_notify_cq\n", __func__);
+               if (newssk->rx_cq) {
                        newssk->poll_cq = 1;
-                       sdp_arm_cq(&newssk->isk.sk);
-                       sdp_poll_cq(newssk, newssk->cq);
+                       sdp_arm_rx_cq(&newssk->isk.sk);
+                       sdp_poll_rx_cq(newssk);
                }
                release_sock(newsk);
        }
@@ -847,7 +859,7 @@ static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo)
 {
        sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n");
 
-       queue_delayed_work(comp_wq, &ssk->dreq_wait_work, timeo);
+       queue_delayed_work(sdp_wq, &ssk->dreq_wait_work, timeo);
        ssk->dreq_wait_timeout = 1;
 }
 
@@ -1143,9 +1155,9 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
 static inline int poll_recv_cq(struct sock *sk)
 {
        int i;
-       if (sdp_sk(sk)->cq) {
+       if (sdp_sk(sk)->rx_cq) {
                for (i = 0; i < recv_poll; ++i)
-                       if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
+                       if (!sdp_poll_rx_cq(sdp_sk(sk))) {
                                ++recv_poll_hit;
                                return 0;
                        }
@@ -1157,9 +1169,9 @@ static inline int poll_recv_cq(struct sock *sk)
 static inline void poll_send_cq(struct sock *sk)
 {
        int i;
-       if (sdp_sk(sk)->cq) {
+       if (sdp_sk(sk)->tx_ring.cq) {
                for (i = 0; i < send_poll; ++i)
-                       if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
+                       if (sdp_xmit_poll(sdp_sk(sk), 1)) {
                                ++send_poll_hit;
                                return;
                        }
@@ -1421,12 +1433,9 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
                        /* We can extend the last page
                         * fragment. */
                        merge = 1;
-               } else if (i == ssk->send_frags ||
-                          (!i &&
-                          !(sk->sk_route_caps & NETIF_F_SG))) {
+               } else if (i == ssk->send_frags) {
                        /* Need to add new fragment and cannot
-                        * do this because interface is non-SG,
-                        * or because all the page slots are
+                        * do this because all the page slots are
                         * busy. */
                        sdp_mark_push(ssk, skb);
                        return SDP_NEW_SEG;
@@ -1649,7 +1658,6 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk)
        }
 }
 
-
 /* Like tcp_sendmsg */
 /* TODO: check locking */
 static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1812,7 +1820,9 @@ wait_for_sndbuf:
 wait_for_memory:
                        SDPSTATS_COUNTER_INC(send_wait_for_mem);
                        if (copied)
-                               sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
+                               sdp_push(sk, ssk, flags & ~MSG_MORE, PAGE_SIZE, TCP_NAGLE_PUSH);
+
+                       sdp_xmit_poll(ssk, 1);
 
                        err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) :
                                     sk_stream_wait_memory(sk, &timeo);
@@ -2386,17 +2396,25 @@ static int __init sdp_init(void)
        sdp_proto.sockets_allocated = sockets_allocated;
        sdp_proto.orphan_count = orphan_count;
 
-       comp_wq = create_singlethread_workqueue("comp_wq");
-       if (!comp_wq)
+       rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+       if (!rx_comp_wq)
                goto no_mem_rx_wq;
 
+       sdp_wq = create_singlethread_workqueue("sdp_wq");
+       if (!sdp_wq)
+               goto no_mem_sdp_wq;
+
        rc = proto_register(&sdp_proto, 1);
-       if (rc)
+       if (rc) {
+               printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc);
                goto error_proto_reg;
+       }
 
        rc = sock_register(&sdp_net_proto);
-       if (rc)
+       if (rc) {
+               printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc);
                goto error_sock_reg;
+       }
 
        sdp_proc_init();
 
@@ -2409,7 +2427,9 @@ static int __init sdp_init(void)
 error_sock_reg:
        proto_unregister(&sdp_proto);
 error_proto_reg:
-       destroy_workqueue(comp_wq);
+       destroy_workqueue(sdp_wq);
+no_mem_sdp_wq:
+       destroy_workqueue(rx_comp_wq);
 no_mem_rx_wq:
        kfree(orphan_count);
 no_mem_orphan_count:
@@ -2427,7 +2447,8 @@ static void __exit sdp_exit(void)
                printk(KERN_WARNING "%s: orphan_count %lld\n", __func__,
                       percpu_counter_read_positive(orphan_count));
 
-       destroy_workqueue(comp_wq);
+       destroy_workqueue(rx_comp_wq);
+       destroy_workqueue(sdp_wq);
 
        flush_scheduled_work();
 
index e759864e95c4cebd8d095d264b9c87c1d504c819..0971a49ebc9abf98502fc51fe32ff7efded177c8 100644 (file)
@@ -260,9 +260,11 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
 
        seq_printf(seq, "rx_poll_miss      \t\t: %d\n", sdpstats.rx_poll_miss);
        seq_printf(seq, "tx_poll_miss      \t\t: %d\n", sdpstats.tx_poll_miss);
+       seq_printf(seq, "tx_poll_hit       \t\t: %d\n", sdpstats.tx_poll_hit);
 
        seq_printf(seq, "CQ stats:\n");
-       seq_printf(seq, "- interrupts\t\t: %d\n", sdpstats.int_count);
+       seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count);
+       seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count);
        return 0;
 }