]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: Process tx completions from sendmsg context. arm tx cq when needed
authorAmir Vadai <amirv@mellanox.co.il>
Sun, 23 Aug 2009 15:28:39 +0000 (18:28 +0300)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:04:38 +0000 (05:04 -0700)
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_tx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index 72c7620186b5d905b4696a23b0053fde4905ab80..b7403970f6694418da9ffc0e8c0d4f7efcef806d 100644 (file)
@@ -13,7 +13,7 @@
 /* Interval between sucessive polls in the Tx routine when polling is used
    instead of interrupts (in per-core Tx rings) - should be power of 2 */
 #define SDP_TX_POLL_MODER      16
-#define SDP_TX_POLL_TIMEOUT    (HZ / 4)
+#define SDP_TX_POLL_TIMEOUT    (HZ / 20)
 #define SDP_NAGLE_TIMEOUT (HZ / 10)
 
 #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
@@ -24,7 +24,7 @@
 #define SDP_ROUTE_TIMEOUT 1000
 #define SDP_RETRY_COUNT 5
 #define SDP_KEEPALIVE_TIME (120 * 60 * HZ)
-#define SDP_FIN_WAIT_TIMEOUT (60 * HZ)
+#define SDP_FIN_WAIT_TIMEOUT (10 * HZ)
 
 #define SDP_TX_SIZE 0x40
 #define SDP_RX_SIZE 0x40
@@ -330,6 +330,7 @@ struct sdp_sock {
        struct delayed_work dreq_wait_work;
        struct work_struct destroy_work;
 
+       int tx_compl_pending;
        atomic_t somebody_is_doing_posts;
 
        /* Like tcp_sock */
@@ -518,13 +519,6 @@ static inline int tx_slots_free(struct sdp_sock *ssk)
        return min_free - SDP_MIN_TX_CREDITS;
 };
 
-/* like sk_stream_memory_free - except measures remote credits */
-static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
-                                        struct bzcopy_state *bz)
-{
-       return tx_slots_free(ssk) > bz->busy;
-}
-
 /* utilities */
 static inline char *mid2str(int mid)
 {
@@ -732,8 +726,7 @@ void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
 void sdp_destroy_work(struct work_struct *work);
 void sdp_reset_sk(struct sock *sk, int rc);
 void sdp_reset(struct sock *sk);
-int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
-                                 struct bzcopy_state *bz);
+int sdp_tx_wait_memory(struct sdp_sock *ssk, long *timeo_p, int *credits_needed);
 void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb);
 
 /* sdp_proc.c */
index 006baaf4f31cfeca8ef2046987801836244d9bcf..5498cc4ab170145c0b59ec0fb033fe81fe751790 100644 (file)
@@ -94,7 +94,7 @@ SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
        "Default idle time in seconds before keepalive probe sent.");
 SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 65536,
        "Zero copy send using SEND threshold; 0=0ff.");
-SDP_MODPARAM_SINT(sdp_zcopy_thresh, 128*1024,
+SDP_MODPARAM_SINT(sdp_zcopy_thresh, 0, //128*1024,
        "Zero copy using RDMA threshold; 0=0ff.");
 #define SDP_RX_COAL_TIME_HIGH 128
 SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
@@ -1064,6 +1064,7 @@ int sdp_init_sock(struct sock *sk)
        ssk->destruct_in_process = 0;
        spin_lock_init(&ssk->lock);
        spin_lock_init(&ssk->tx_sa_lock);
+       ssk->tx_compl_pending = 0;
 
        atomic_set(&ssk->somebody_is_doing_posts, 0);
 
@@ -1613,9 +1614,12 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
        return copy;
 }
 
-/* like sk_stream_wait_memory - except waits on remote credits */
-int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
-                                 struct bzcopy_state *bz)
+/* like sk_stream_wait_memory - except:
+ * - if credits_needed provided - wait for enough credits
+ * - TX irq will use this (in sendmsg context) to do the actual tx
+ *   comp poll and post
+ */
+int sdp_tx_wait_memory(struct sdp_sock *ssk, long *timeo_p, int *credits_needed)
 {
        struct sock *sk = &ssk->isk.sk;
        int err = 0;
@@ -1623,9 +1627,7 @@ int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
        long current_timeo = *timeo_p;
        DEFINE_WAIT(wait);
 
-       BUG_ON(!bz);
-
-       if (sdp_bzcopy_slots_avail(ssk, bz))
+       if (sk_stream_memory_free(sk))
                current_timeo = vm_wait = (net_random() % (HZ / 5)) + 2;
 
        while (1) {
@@ -1633,41 +1635,57 @@ int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
 
                prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
 
-               if (unlikely(sk->sk_err | (sk->sk_shutdown & SEND_SHUTDOWN))) {
-                       err = -EPIPE;
-                       break;
-               }
+               if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN))
+                       goto do_error;
+               if (!*timeo_p)
+                       goto do_nonblock;
+               if (signal_pending(current))
+                       goto do_interrupted;
+               clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
 
-               if (unlikely(!*timeo_p)) {
-                       err = -EAGAIN;
-                       break;
-               }
+               sdp_do_posts(ssk);
 
-               if (unlikely(signal_pending(current))) {
-                       err = sock_intr_errno(*timeo_p);
-                       break;
+               if (credits_needed) {
+                       if (tx_slots_free(ssk) >= *credits_needed)
+                               break;
+               } else {
+                       if (sk_stream_memory_free(sk) && !vm_wait)
+                               break;
                }
 
-               clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
-
                posts_handler_put(ssk);
 
-               if (sdp_bzcopy_slots_avail(ssk, bz))
-                       break;
-
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
                sk->sk_write_pending++;
 
+               sdp_prf1(sk, NULL, "Going to sleep");
+
                if (tx_credits(ssk) > SDP_MIN_TX_CREDITS)
                        sdp_arm_tx_cq(sk);
 
-               sk_wait_event(sk, &current_timeo,
-                       sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
+               if (credits_needed) {
+                       sk_wait_event(sk, &current_timeo,
+                                       !sk->sk_err && 
+                                       !(sk->sk_shutdown & SEND_SHUTDOWN) &&
+                                       !ssk->tx_compl_pending &&
+                                       tx_slots_free(ssk) >= *credits_needed &&
+                                       vm_wait);
+               } else {
+                       sk_wait_event(sk, &current_timeo,
+                                       !sk->sk_err && 
+                                       !(sk->sk_shutdown & SEND_SHUTDOWN) &&
+                                       !ssk->tx_compl_pending &&
+                                       sk_stream_memory_free(sk) &&
+                                       vm_wait);
+               }
+
+               sdp_prf1(sk, NULL, "Woke up");
                sk->sk_write_pending--;
-               sdp_prf1(sk, NULL, "finished wait for mem");
 
                posts_handler_get(ssk);
-               sdp_do_posts(ssk);
+
+               if (!ssk->qp_active)
+                       goto do_error;
 
                if (vm_wait) {
                        vm_wait -= current_timeo;
@@ -1679,9 +1697,19 @@ int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
                }
                *timeo_p = current_timeo;
        }
-
+out:
        finish_wait(sk->sk_sleep, &wait);
        return err;
+
+do_error:
+       err = -EPIPE;
+       goto out;
+do_nonblock:
+       err = -EAGAIN;
+       goto out;
+do_interrupted:
+       err = sock_intr_errno(*timeo_p);
+       goto out;
 }
 
 /* Like tcp_sendmsg */
@@ -1772,7 +1800,7 @@ new_segment:
                                 * receive credits.
                                 */
                                if (bz) {
-                                       if (!sdp_bzcopy_slots_avail(ssk, bz))
+                                       if (tx_slots_free(ssk) < bz->busy)
                                                goto wait_for_sndbuf;
                                } else {
                                        if (!sk_stream_memory_free(sk))
@@ -1850,21 +1878,8 @@ wait_for_memory:
                        if (copied)
                                sdp_push(sk, ssk, flags & ~MSG_MORE);
 
-                       sdp_xmit_poll(ssk, 1);
-
-                       if (bz) {
-                               err = sdp_bzcopy_wait_memory(ssk, &timeo, bz);
-                       } else {
-                               posts_handler_put(ssk);
-
-                               sdp_arm_tx_cq(sk);
-
-                               err = sk_stream_wait_memory(sk, &timeo);
-
-                               posts_handler_get(ssk);
-                               sdp_do_posts(ssk);
-                       }
-
+                       err = sdp_tx_wait_memory(ssk, &timeo,
+                                       bz ? &bz->busy : NULL);
                        if (err)
                                goto do_error;
 
index bf0a290c99f643685d73daa2367b8afa9f01a27a..18d89ebfc31c7305dfbee690814dadc6ad28abb7 100644 (file)
@@ -53,6 +53,8 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force)
        if (!timer_pending(&ssk->tx_ring.timer))
                mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
 
+       ssk->tx_compl_pending = 0;
+
        /* Poll the CQ every SDP_TX_POLL_MODER packets */
        if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
                wc_processed = sdp_process_tx_cq(ssk);
@@ -297,28 +299,74 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk)
        if (wc_processed) {
                struct sock *sk = &ssk->isk.sk;
                sdp_post_sends(ssk, 0);
+               sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d", 
+                               (u32) tx_ring_posted(ssk));
+               sk_stream_write_space(&ssk->isk.sk);
+               if (sk->sk_write_pending &&
+                               test_bit(SOCK_NOSPACE, &sk->sk_socket->flags) &&
+                               tx_ring_posted(ssk)) {
+                       /* a write is pending and still no room in tx queue,
+                        * arm tx cq
+                        */
+                       sdp_prf(&ssk->isk.sk, NULL, "pending tx - rearming");
+                       sdp_arm_tx_cq(sk);
+               }
 
-               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
-                       sk_stream_write_space(&ssk->isk.sk);
        }
 
        return wc_processed;
 }
 
+/* Select who will handle tx completion:
+ * - a write is pending - wake it up and let it do the poll + post
+ * - post handler is taken - taker will do the poll + post
+ * else return 1 and let the caller do it
+ */  
+static int sdp_tx_handler_select(struct sdp_sock *ssk)
+{
+       struct sock *sk = &ssk->isk.sk;
+
+       if (sk->sk_write_pending) {
+               /* Do the TX posts from sender context */
+               if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+                       sdp_prf1(sk, NULL, "Waking up pending sendmsg");
+                       wake_up_interruptible(sk->sk_sleep);
+                       return 0;
+               } else
+                       sdp_prf1(sk, NULL, "Unexpected: sk_sleep=%p, "
+                               "waitqueue_active: %d",
+                               sk->sk_sleep, waitqueue_active(sk->sk_sleep));
+       }
+
+       if (posts_handler(ssk)) {
+               /* Somebody else available to check for completion */
+               sdp_prf1(sk, NULL, "Somebody else will call do_posts");
+               return 0;
+       } 
+
+       return 1;
+}
+
 static void sdp_poll_tx_timeout(unsigned long data)
 {
        struct sdp_sock *ssk = (struct sdp_sock *)data;
        struct sock *sk = &ssk->isk.sk;
        u32 inflight, wc_processed;
 
-       sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d", 
-               (u32) tx_ring_posted(ssk));
+       sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d, head=%d tail=%d", 
+               (u32) tx_ring_posted(ssk),
+               ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
 
        /* Only process if the socket is not in use */
        bh_lock_sock(sk);
        if (sock_owned_by_user(sk)) {
-               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
-               sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy\n");
+               sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy");
+
+               if (sdp_tx_handler_select(ssk)) {
+                       sdp_prf1(sk, NULL, "schedule a timer");
+                       mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+               }
+
                SDPSTATS_COUNTER_INC(tx_poll_busy);
                goto out;
        }
@@ -333,6 +381,8 @@ static void sdp_poll_tx_timeout(unsigned long data)
                SDPSTATS_COUNTER_INC(tx_poll_hit);
 
        inflight = (u32) rx_ring_posted(ssk);
+       sdp_prf1(&ssk->isk.sk, NULL, "finished tx proccessing. inflight = %d",
+                       tx_ring_posted(ssk));
 
        /* If there are still packets in flight and the timer has not already
         * been scheduled by the Tx routine then schedule it here to guarantee
@@ -360,8 +410,13 @@ static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
 
        SDPSTATS_COUNTER_INC(tx_int_count);
 
-       mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
-       tasklet_schedule(&ssk->tx_ring.tasklet);
+       ssk->tx_compl_pending = 1;
+
+       if (sdp_tx_handler_select(ssk)) {
+               sdp_prf1(sk, NULL, "poll and post from tasklet");
+               mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+               tasklet_schedule(&ssk->tx_ring.tasklet);
+       }
 }
 
 void sdp_tx_ring_purge(struct sdp_sock *ssk)
index 3ce1d79581ef85bf2b2cad149c35f38217b3dae5..c8b91ea2d833580256a2af70e2d65b696a76f68e 100644 (file)
 #include <linux/delay.h>
 #include "sdp.h"
 
-static struct bzcopy_state dummy_bz = {
-busy: 1,
-};
-
 static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa,
                int page_idx, int off, size_t len)
 {
@@ -773,6 +769,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
 {
        struct sdp_sock *ssk = sdp_sk(sk);
        int ret = 0;
+       int credits_needed = 1;
 
        sdp_dbg_data(sk, "Wait for mem\n");
 
@@ -784,7 +781,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p)
 
        sdp_xmit_poll(ssk, 1);
 
-       ret = sdp_bzcopy_wait_memory(ssk, timeo_p, &dummy_bz);
+       ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
 
        return ret;
 }
@@ -801,7 +798,7 @@ static int sdp_rdma_adv_single(struct sock *sk,
        sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n",
                offset, len, page_cnt);
 
-       if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) {
+       if (tx_slots_free(ssk) == 0) {
                rc = wait_for_sndbuf(sk, &timeo);
                if (rc) {
                        sdp_warn(sk, "Couldn't get send buffer\n");