]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sdp: Some improvements to multistream BW
authorAmir Vadai <amirv@mellanox.co.il>
Sun, 12 Sep 2010 16:24:35 +0000 (18:24 +0200)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 6 Oct 2015 12:05:21 +0000 (05:05 -0700)
- Removed almost all locks in data path - using lock_sock only
- CPU Affinity for SKB handling
- Don't do any RX proccessing from IRQ/timer
- recv_poll value in usec instead of msec

Signed-off-by: Eldad Zinger <eldadz@mellanox.co.il>
Signed-off-by: Amir Vadai <amirv@mellanox.co.il>
drivers/infiniband/ulp/sdp/sdp.h
drivers/infiniband/ulp/sdp/sdp_bcopy.c
drivers/infiniband/ulp/sdp/sdp_dbg.h
drivers/infiniband/ulp/sdp/sdp_main.c
drivers/infiniband/ulp/sdp/sdp_proc.c
drivers/infiniband/ulp/sdp/sdp_rx.c
drivers/infiniband/ulp/sdp/sdp_tx.c
drivers/infiniband/ulp/sdp/sdp_zcopy.c

index 4ecf9a0c9e10d9f7cbc5b588ec01d9d03dfd4763..547a162829895193657ae0fb3fe52e6af77b6ddc 100644 (file)
@@ -18,7 +18,7 @@
 #define SDP_TX_POLL_TIMEOUT    (HZ / 20)
 #define SDP_NAGLE_TIMEOUT (HZ / 10)
 
-#define SDP_RX_POLL_TIMEOUT    (1 + HZ / 1000)
+#define SDP_RX_ARMING_DELAY    (msecs_to_jiffies(10))
 #define SDP_RDMA_READ_TIMEOUT  (5 * HZ)
 
 #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5)
@@ -97,25 +97,15 @@ struct sdp_skb_cb {
 #define posts_handler_get(ssk)                                         \
        do {                                                            \
                atomic_inc(&ssk->somebody_is_doing_posts);              \
-               /* postpone the rx_ring.timer, there is no need to enable
-                * interrupts because there will be cq-polling. */      \
-               if (likely(ssk->qp_active))                             \
-                       mod_timer(&ssk->rx_ring.timer, MAX_JIFFY_OFFSET); \
+               sdp_postpone_rx_timer(ssk);                             \
        } while (0)
 
 #define posts_handler_put(ssk, intr_delay)                             \
        do {                                                            \
                sdp_do_posts(ssk);                                      \
                if (atomic_dec_and_test(&ssk->somebody_is_doing_posts) && \
-                       likely(ssk->qp_active)) {                       \
-                       if (intr_delay)                                 \
-                               mod_timer(&ssk->rx_ring.timer, intr_delay); \
-                       else                                            \
-                               /* There is no point of setting up a timer
-                                * for an immediate cq-arming, better arm it
-                                * now. */                              \
-                               sdp_arm_rx_cq(&ssk->isk.sk);            \
-               }                                                       \
+                       likely(ssk->qp_active))                         \
+                               sdp_schedule_arm_rx_cq(ssk, intr_delay);\
        } while (0)
 
 #define sdp_common_release(sk) do { \
@@ -315,12 +305,7 @@ struct sdp_rx_ring {
        atomic_t          tail;
        struct ib_cq     *cq;
 
-       int              destroyed;
-       rwlock_t         destroyed_lock;
-       spinlock_t       lock;
-
-       struct timer_list       timer;
-       struct tasklet_struct   tasklet;
+       struct timer_list       cq_arm_timer;
 };
 
 struct sdp_device {
@@ -359,6 +344,7 @@ struct sdp_sock {
        struct sk_buff_head rx_ctl_q;
        struct sock *parent;
        struct sdp_device *sdp_dev;
+       int cpu;
 
        int qp_active;
        spinlock_t tx_sa_lock;
@@ -462,38 +448,10 @@ static inline void tx_sa_reset(struct tx_srcavail_state *tx_sa)
                        sizeof(*tx_sa) - offsetof(typeof(*tx_sa), busy));
 }
 
-static inline void rx_ring_unlock(struct sdp_rx_ring *rx_ring)
-{
-       read_unlock_bh(&rx_ring->destroyed_lock);
-}
-
-static inline int rx_ring_trylock(struct sdp_rx_ring *rx_ring)
-{
-       read_lock_bh(&rx_ring->destroyed_lock);
-       if (rx_ring->destroyed) {
-               rx_ring_unlock(rx_ring);
-               return 0;
-       }
-       return 1;
-}
-
-static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring)
-{
-       write_lock_bh(&rx_ring->destroyed_lock);
-       rx_ring->destroyed = 1;
-       write_unlock_bh(&rx_ring->destroyed_lock);
-}
-
 static inline int sdp_chk_sa_cancel(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa)
 {
-       int res;
-
-       spin_lock_irq(&ssk->rx_ring.lock);
-       res = ssk->sa_cancel_arrived &&
+       return ssk->sa_cancel_arrived &&
                before(rx_sa->mseq, ssk->sa_cancel_mseq);
-       spin_unlock_irq(&ssk->rx_ring.lock);
-
-       return res;
 }
 
 static inline struct sdp_sock *sdp_sk(const struct sock *sk)
@@ -546,29 +504,6 @@ static inline void sdp_set_error(struct sock *sk, int err)
        sk->sk_error_report(sk);
 }
 
-static inline void sdp_arm_rx_cq(struct sock *sk)
-{
-       if (unlikely(!sdp_sk(sk)->rx_ring.cq))
-               return;
-
-       sdp_prf(sk, NULL, "Arming RX cq");
-       sdp_dbg_data(sk, "Arming RX cq\n");
-
-       ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
-}
-
-static inline void sdp_arm_tx_cq(struct sock *sk)
-{
-       if (unlikely(!sdp_sk(sk)->tx_ring.cq))
-               return;
-
-       sdp_prf(sk, NULL, "Arming TX cq");
-       sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
-               tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk)));
-
-       ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
-}
-
 /* return the min of:
  * - tx credits
  * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
@@ -771,21 +706,29 @@ struct sdpstats {
        u32 sendmsg;
        u32 recvmsg;
        u32 post_send_credits;
-       u32 sendmsg_nagle_skip;
        u32 sendmsg_seglen[25];
        u32 send_size[25];
        u32 post_recv;
+       u32 rx_int_arm;
+       u32 tx_int_arm;
        u32 rx_int_count;
        u32 tx_int_count;
+       u32 rx_int_wake_up;
+       u32 rx_int_queue;
+       u32 rx_int_no_op;
+       u32 rx_cq_modified;
+       u32 rx_cq_arm_timer;
        u32 rx_wq;
        u32 bzcopy_poll_miss;
        u32 send_wait_for_mem;
        u32 send_miss_no_credits;
        u32 rx_poll_miss;
        u32 rx_poll_hit;
+       u32 poll_hit_usec[16];
        u32 tx_poll_miss;
        u32 tx_poll_hit;
        u32 tx_poll_busy;
+       u32 tx_poll_no_op;
        u32 memcpy_count;
        u32 credits_before_update[64];
        u32 zcopy_tx_timeout;
@@ -793,6 +736,8 @@ struct sdpstats {
        u32 zcopy_tx_aborted;
        u32 zcopy_tx_error;
        u32 fmr_alloc_error;
+       u32 keepalive_timer;
+       u32 nagle_timer;
 };
 
 static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
@@ -842,6 +787,60 @@ static inline void sdp_cleanup_sdp_buf(struct sdp_sock *ssk, struct sdp_buf *sbu
        }
 }
 
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+       if (unlikely(!sdp_sk(sk)->rx_ring.cq))
+               return;
+
+       SDPSTATS_COUNTER_INC(rx_int_arm);
+       sdp_dbg_data(sk, "Arming RX cq\n");
+
+       if (unlikely(0 > ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq,
+                                       IB_CQ_NEXT_COMP)))
+               sdp_warn(sk, "error arming rx cq\n");
+}
+
+static inline void sdp_arm_tx_cq(struct sock *sk)
+{
+       if (unlikely(!sdp_sk(sk)->tx_ring.cq))
+               return;
+
+       SDPSTATS_COUNTER_INC(tx_int_arm);
+       sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
+               tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk)));
+
+       if (unlikely(0 > ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq,
+                                       IB_CQ_NEXT_COMP)))
+               sdp_warn(sk, "error arming tx cq\n");
+}
+
+static inline void sdp_postpone_rx_timer(struct sdp_sock *ssk)
+{
+       if (timer_pending(&ssk->rx_ring.cq_arm_timer) && ssk->qp_active)
+               mod_timer(&ssk->rx_ring.cq_arm_timer, MAX_JIFFY_OFFSET);
+}
+
+static inline void sdp_schedule_arm_rx_cq(struct sdp_sock *ssk,
+               unsigned long delay)
+{
+       if (unlikely(!ssk->rx_ring.cq))
+               return;
+
+       if (delay && ssk->qp_active)
+               mod_timer(&ssk->rx_ring.cq_arm_timer, jiffies + delay);
+       else {
+               /* There is no point of setting up a timer for an immediate
+                * cq-arming, better arm it now. */
+               sdp_arm_rx_cq(&ssk->isk.sk);
+       }
+}
+
+static inline int somebody_is_waiting(struct sock *sk)
+{
+       return sk->sk_socket &&
+               test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
+}
+
 /* sdp_main.c */
 void sdp_set_default_moderation(struct sdp_sock *ssk);
 int sdp_init_sock(struct sock *sk);
@@ -873,7 +872,6 @@ void sdp_nagle_timeout(unsigned long data);
 void sdp_post_keepalive(struct sdp_sock *ssk);
 
 /* sdp_rx.c */
-void sdp_rx_ring_init(struct sdp_sock *ssk);
 int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
 void sdp_rx_ring_destroy(struct sdp_sock *ssk);
 int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
@@ -882,7 +880,7 @@ void sdp_do_posts(struct sdp_sock *ssk);
 void sdp_rx_comp_full(struct sdp_sock *ssk);
 void sdp_remove_large_sock(const struct sdp_sock *ssk);
 void sdp_handle_disconn(struct sock *sk);
-int sdp_process_rx(struct sdp_sock *ssk);
+int sdp_poll_rx_cq(struct sdp_sock *ssk);
 
 /* sdp_zcopy.c */
 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct iovec *iov);
index 6e91c426e20b03edb2fd53a87ac008658fe1be2f..166dae322964f0f7374212bb4ed192c6057744c4 100644 (file)
@@ -85,7 +85,7 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
                srcah = (struct sdp_srcah *)(h+1);
 
                len += snprintf(buf + len, 255-len, " | payload: 0x%zx, "
-                               "len: 0x%zx, rkey: 0x%x, vaddr: 0x%llx |",
+                               "len: 0x%x, rkey: 0x%x, vaddr: 0x%llx |",
                                ntohl(h->len) - sizeof(struct sdp_bsdh) -
                                sizeof(struct sdp_srcah),
                                ntohl(srcah->len), ntohl(srcah->rkey),
@@ -148,6 +148,7 @@ void sdp_nagle_timeout(unsigned long data)
        struct sdp_sock *ssk = (struct sdp_sock *)data;
        struct sock *sk = &ssk->isk.sk;
 
+       SDPSTATS_COUNTER_INC(nagle_timer);
        sdp_dbg_data(sk, "last_unacked = %ld\n", ssk->nagle_last_unacked);
 
        if (!ssk->nagle_last_unacked)
@@ -179,6 +180,12 @@ out2:
        }
 }
 
+static inline int sdp_should_rearm(struct sock *sk)
+{
+       return sk->sk_state != TCP_ESTABLISHED || sdp_sk(sk)->tx_sa ||
+               somebody_is_waiting(sk);
+}
+
 void sdp_post_sends(struct sdp_sock *ssk, gfp_t gfp)
 {
        /* TODO: nonagle? */
@@ -200,8 +207,12 @@ void sdp_post_sends(struct sdp_sock *ssk, gfp_t gfp)
                sdp_xmit_poll(ssk,  1);
 
        /* Run out of credits, check if got a credit update */
-       if (unlikely(tx_credits(ssk) <= SDP_MIN_TX_CREDITS))
-               sdp_process_rx(ssk);
+       if (unlikely(tx_credits(ssk) <= SDP_MIN_TX_CREDITS)) {
+               sdp_poll_rx_cq(ssk);
+
+               if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk)))
+                       sdp_arm_rx_cq(sk);
+       }
 
        if (ssk->recv_request &&
            ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
index 6d7d8e6db33db30acd02c25bc48585c8aff6ec62..da9bdc9f57ae286d3d0609c9e41d7695139f7133 100644 (file)
@@ -23,7 +23,7 @@
        _sdp_printk(__func__, __LINE__, level, sk, format, ## arg)
 #define sdp_warn(sk, format, arg...)                                   \
        do {                                                            \
-               sdp_printk(KERN_WARNING, sk, "\t%lx: " format , jiffies, ## arg); \
+               sdp_printk(KERN_WARNING, sk, format, ## arg); \
                sdp_prf(sk, NULL, format , ## arg);                     \
        } while (0)
 
index 4f2127c044c438509d274c694956c979e587a426..f1a7d757df34902062a2afa1cdbb461d37c8c012 100644 (file)
@@ -90,7 +90,7 @@ SDP_MODPARAM_INT(sdp_data_debug_level, 0,
 SDP_MODPARAM_SINT(sdp_fmr_pool_size, 20, "Number of FMRs to allocate for pool");
 SDP_MODPARAM_SINT(sdp_fmr_dirty_wm, 5, "Watermark to flush fmr pool");
 
-SDP_MODPARAM_SINT(recv_poll, 10, "How many msec to poll recv.");
+SDP_MODPARAM_SINT(recv_poll, 700, "usecs to poll recv before arming interrupt.");
 SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
        "Default idle time in seconds before keepalive probe sent.");
 static int sdp_bzcopy_thresh = 0;
@@ -189,8 +189,6 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
 
        ssk->qp_active = 0;
 
-       del_timer(&ssk->tx_ring.timer);
-
        if (ssk->qp) {
                ib_destroy_qp(ssk->qp);
                ssk->qp = NULL;
@@ -232,6 +230,7 @@ static void sdp_keepalive_timer(unsigned long data)
        struct sdp_sock *ssk = sdp_sk(sk);
 
        sdp_dbg(sk, "%s\n", __func__);
+       SDPSTATS_COUNTER_INC(keepalive_timer);
 
        /* Only process if the socket is not in use */
        bh_lock_sock(sk);
@@ -255,12 +254,6 @@ out:
        sock_put(sk, SOCK_REF_KEEPALIVE);
 }
 
-static void sdp_init_keepalive_timer(struct sock *sk)
-{
-       sk->sk_timer.function = sdp_keepalive_timer;
-       sk->sk_timer.data = (unsigned long)sk;
-}
-
 static void sdp_set_keepalive(struct sock *sk, int val)
 {
        sdp_dbg(sk, "%s %d\n", __func__, val);
@@ -293,12 +286,13 @@ void sdp_set_default_moderation(struct sdp_sock *ssk)
                if (hw_int_mod_count > 0 && hw_int_mod_usec > 0) {
                        err = ib_modify_cq(ssk->rx_ring.cq, hw_int_mod_count,
                                        hw_int_mod_usec);
-                       if (err)
+                       if (unlikely(err))
                                sdp_warn(sk,
-                                       "Failed modifying moderation for cq");
+                                       "Failed modifying moderation for cq\n");
                        else
                                sdp_dbg(sk,
                                        "Using fixed interrupt moderation\n");
+                       SDPSTATS_COUNTER_INC(rx_cq_modified);
                }
                return;
        }
@@ -413,10 +407,11 @@ static void sdp_auto_moderation(struct sdp_sock *ssk)
        if (moder_time != mod->last_moder_time) {
                mod->last_moder_time = moder_time;
                err = ib_modify_cq(ssk->rx_ring.cq, mod->moder_cnt, moder_time);
-               if (err) {
+               if (unlikely(err)) {
                        sdp_dbg_data(&ssk->isk.sk,
                                        "Failed modifying moderation for cq");
                }
+               SDPSTATS_COUNTER_INC(rx_cq_modified);
        }
 
 out:
@@ -663,6 +658,7 @@ static void sdp_close(struct sock *sk, long timeout)
        sdp_dbg(sk, "%s\n", __func__);
        sdp_prf(sk, NULL, __func__);
 
+       sdp_sk(sk)->cpu = smp_processor_id();
        sdp_delete_keepalive_timer(sk);
 
        sk->sk_shutdown = SHUTDOWN_MASK;
@@ -756,6 +752,8 @@ static int sdp_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len)
                .sin_addr.s_addr = inet_sk(sk)->saddr,
        };
        int rc;
+
+       ssk->cpu = smp_processor_id();
        release_sock(sk);
        flush_workqueue(sdp_wq);
        lock_sock(sk);
@@ -809,6 +807,7 @@ static int sdp_disconnect(struct sock *sk, int flags)
 
        sdp_dbg(sk, "%s\n", __func__);
 
+       ssk->cpu = smp_processor_id();
        if (sk->sk_state != TCP_LISTEN) {
                if (ssk->id) {
                        sdp_sk(sk)->qp_active = 0;
@@ -899,6 +898,7 @@ static struct sock *sdp_accept(struct sock *sk, int flags, int *err)
 
        ssk = sdp_sk(sk);
        lock_sock(sk);
+       ssk->cpu = smp_processor_id();
 
        /* We need to make sure that this socket is listening,
         * and that it has something pending.
@@ -961,6 +961,7 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
                        return -EINVAL;
 
                lock_sock(sk);
+               ssk->cpu = smp_processor_id();
                if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV))
                        answ = 0;
                else if (sock_flag(sk, SOCK_URGINLINE) ||
@@ -1122,7 +1123,7 @@ int sdp_init_sock(struct sock *sk)
 
        atomic_set(&ssk->mseq_ack, 0);
 
-       sdp_rx_ring_init(ssk);
+       ssk->rx_ring.buffer = NULL;
        ssk->tx_ring.buffer = NULL;
        ssk->sdp_disconnect = 0;
        ssk->destructed_already = 0;
@@ -1132,10 +1133,10 @@ int sdp_init_sock(struct sock *sk)
        ssk->tx_compl_pending = 0;
 
        atomic_set(&ssk->somebody_is_doing_posts, 0);
-
+       ssk->cpu = smp_processor_id();
        ssk->tx_ring.rdma_inflight = NULL;
 
-       init_timer(&ssk->rx_ring.timer);
+       init_timer(&ssk->rx_ring.cq_arm_timer);
        init_timer(&ssk->tx_ring.timer);
        init_timer(&ssk->nagle_timer);
        init_timer(&sk->sk_timer);
@@ -1206,6 +1207,7 @@ static int sdp_setsockopt(struct sock *sk, int level, int optname,
                return -EFAULT;
 
        lock_sock(sk);
+       ssk->cpu = smp_processor_id();
 
        /* SOCK_KEEPALIVE is really a SOL_SOCKET level option but there
         * is a problem handling it at that level.  In order to start
@@ -1348,9 +1350,20 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
        return 0;
 }
 
+static inline int cycles_before(cycles_t a, cycles_t b)
+{
+       /* cycles_t is unsigned, but may be int/long/long long. */
+        
+       if (sizeof(cycles_t) == 4)
+               return before(a, b);
+       else
+               return (s64)(a - b) < 0;
+}
+
 static inline int poll_recv_cq(struct sock *sk)
 {
-       unsigned long jiffies_end = jiffies + recv_poll * HZ / 1000;
+       cycles_t start = get_cycles();
+       cycles_t end =  start + recv_poll * cpu_khz / 1000;
 
        sdp_prf(sk, NULL, "polling recv");
 
@@ -1358,11 +1371,15 @@ static inline int poll_recv_cq(struct sock *sk)
                return 0;
 
        do {
-               if (sdp_process_rx(sdp_sk(sk))) {
+               if (sdp_poll_rx_cq(sdp_sk(sk))) {
                        SDPSTATS_COUNTER_INC(rx_poll_hit);
+                       SDPSTATS_HIST(poll_hit_usec,
+                                       (get_cycles() - start) *
+                                       1000 / cpu_khz);
                        return 0;
                }
-       } while (jiffies < jiffies_end);
+       } while (cycles_before(get_cycles(), end));
+
        SDPSTATS_COUNTER_INC(rx_poll_miss);
        return 1;
 }
@@ -1881,6 +1898,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        SDPSTATS_COUNTER_INC(sendmsg);
 
        lock_sock(sk);
+       ssk->cpu = smp_processor_id();
        sdp_dbg_data(sk, "%s size = 0x%zx\n", __func__, size);
 
        posts_handler_get(ssk);
@@ -1962,13 +1980,10 @@ new_segment:
        ( __bz && tx_slots_free(ssk) < __bz->busy) || \
                (!__bz && !sk_stream_memory_free(sk)))
                                if (unlikely(can_not_tx(bz))) {
-                                       if (!poll_recv_cq(sk)) {
+                                       if (!poll_recv_cq(sk))
                                                sdp_do_posts(ssk);
-                                       }
-                                       if ((can_not_tx(bz))) {
-                                               sdp_arm_rx_cq(sk);
+                                       if ((can_not_tx(bz)))
                                                goto wait_for_sndbuf;
-                                       }
                                }
 
                                skb = sdp_alloc_skb_data(sk, min(seglen, size_goal), 0);
@@ -2082,7 +2097,7 @@ out_err:
        sdp_dbg_data(sk, "err: %d\n", err);
 
 fin:
-       posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT);
+       posts_handler_put(ssk, SDP_RX_ARMING_DELAY);
 
        if (!err && !ssk->qp_active) {
                err = -EPIPE;
@@ -2105,14 +2120,10 @@ static inline int sdp_abort_rx_srcavail(struct sock *sk)
 
        h->mid = SDP_MID_DATA;
 
-       spin_lock_irq(&ssk->rx_ring.lock);
-
        RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
        kfree(ssk->rx_sa);
        ssk->rx_sa = NULL;
 
-       spin_unlock_irq(&ssk->rx_ring.lock);
-
        return 0;
 }
 
@@ -2138,6 +2149,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
        SDPSTATS_COUNTER_INC(recvmsg);
 
        lock_sock(sk);
+       ssk->cpu = smp_processor_id();
        sdp_dbg_data(sk, "iovlen: %zd iov_len: 0x%zx flags: 0x%x peek: 0x%x\n",
                        msg->msg_iovlen, msg->msg_iov[0].iov_len, flags,
                        MSG_PEEK);
@@ -2204,17 +2216,14 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
                                goto found_fin_ok;
 
                        case SDP_MID_SRCAVAIL:
-                               spin_lock_irq(&ssk->rx_ring.lock);
                                rx_sa = RX_SRCAVAIL_STATE(skb);
                                if (unlikely(!rx_sa)) {
                                        /* SrcAvailCancel arrived and handled */
                                        h->mid = SDP_MID_DATA;
-                                       spin_unlock_irq(&ssk->rx_ring.lock);
                                        goto sdp_mid_data;
                                }
 
                                rx_sa->is_treated = 1;
-                               spin_unlock_irq(&ssk->rx_ring.lock);
 
                                if (sdp_chk_sa_cancel(ssk, rx_sa) ||
                                                !ssk->sdp_dev ||
@@ -2488,7 +2497,7 @@ got_disconn_in_peek:
        err = copied;
 out:
 
-       posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT);
+       posts_handler_put(ssk, SDP_RX_ARMING_DELAY);
 
        sdp_auto_moderation(ssk);
        
@@ -2540,6 +2549,7 @@ static int sdp_inet_listen(struct socket *sock, int backlog)
        int err;
 
        lock_sock(sk);
+       sdp_sk(sk)->cpu = smp_processor_id();
 
        err = -EINVAL;
        if (sock->state != SS_UNCONNECTED)
@@ -2585,11 +2595,13 @@ static unsigned int sdp_poll(struct file *file, struct socket *socket,
        sdp_dbg_data(sk, "%s\n", __func__);
 
        lock_sock(sk);
+       sdp_sk(sk)->cpu = smp_processor_id();
 
        if (sk->sk_state == TCP_ESTABLISHED) {
                sdp_prf(sk, NULL, "polling");
-               if (poll_recv_cq(sk))
-                       sdp_arm_rx_cq(sk);
+               posts_handler_get(sdp_sk(sk));
+               poll_recv_cq(sk);
+               posts_handler_put(sdp_sk(sk), 0);
        }
        mask = datagram_poll(file, socket, wait);
 
@@ -2728,7 +2740,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
 
        sk->sk_destruct = sdp_destruct;
 
-       sdp_init_keepalive_timer(sk);
+       setup_timer(&sk->sk_timer, sdp_keepalive_timer, (unsigned long)sk);
 
        sock->ops = &sdp_proto_ops;
        sock->state = SS_UNCONNECTED;
@@ -2899,7 +2911,7 @@ static int __init sdp_init(void)
        sdp_proto.sockets_allocated = sockets_allocated;
        sdp_proto.orphan_count = orphan_count;
 
-       rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+       rx_comp_wq = create_workqueue("rx_comp_wq");
        if (!rx_comp_wq)
                goto no_mem_rx_wq;
 
index 3faa8271344d2224a4941bbb3481c7fc1ad1b4a3..9f94c2b4390fef2f9e569078f6361a00d676e14a 100644 (file)
@@ -236,7 +236,7 @@ static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n,
                memset(s, '*', j);
                s[j] = '\0';
 
-               seq_printf(seq, "%10d | %-50s - %d\n", val, s, h[i]);
+               seq_printf(seq, "%10d | %-50s - %u\n", val, s, h[i]);
        }
 }
 
@@ -310,6 +310,8 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
         }
 
        seq_printf(seq, "\n");
+       seq_printf(seq, "sdp_recvmsg() calls\t\t: %d\n",
+               SDPSTATS_COUNTER_GET(recvmsg));
        seq_printf(seq, "post_recv         \t\t: %d\n",
                SDPSTATS_COUNTER_GET(post_recv));
        seq_printf(seq, "BZCopy poll miss  \t\t: %d\n",
@@ -321,12 +323,26 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
 
        seq_printf(seq, "rx_poll_miss      \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_miss));
        seq_printf(seq, "rx_poll_hit       \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_hit));
+       __sdpstats_seq_hist(seq, "poll_hit_usec", poll_hit_usec, 1);
+       seq_printf(seq, "rx_cq_arm_timer      \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_cq_arm_timer));
+
        seq_printf(seq, "tx_poll_miss      \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_miss));
        seq_printf(seq, "tx_poll_busy      \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_busy));
        seq_printf(seq, "tx_poll_hit       \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_hit));
+       seq_printf(seq, "tx_poll_no_op     \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_no_op));
+
+       seq_printf(seq, "keepalive timer   \t\t: %d\n", SDPSTATS_COUNTER_GET(keepalive_timer));
+       seq_printf(seq, "nagle timer       \t\t: %d\n", SDPSTATS_COUNTER_GET(nagle_timer));
 
        seq_printf(seq, "CQ stats:\n");
-       seq_printf(seq, "- RX interrupts\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_count));
+       seq_printf(seq, "- RX irq armed  \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_arm));
+       seq_printf(seq, "- RX interrupts \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_count));
+       seq_printf(seq, "- RX int wake up\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_wake_up));
+       seq_printf(seq, "- RX int queue  \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_queue));
+       seq_printf(seq, "- RX int no op  \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_no_op));
+       seq_printf(seq, "- RX cq modified\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_cq_modified));
+
+       seq_printf(seq, "- TX irq armed\t\t: %d\n", SDPSTATS_COUNTER_GET(tx_int_arm));
        seq_printf(seq, "- TX interrupts\t\t: %d\n", SDPSTATS_COUNTER_GET(tx_int_count));
 
        seq_printf(seq, "ZCopy stats:\n");
index 9aa6acacc8822250f3bb40e7f974d04a5ed2df0a..a65cb4ba722a1fb97529d8438d5fa9df7aaa21f4 100644 (file)
@@ -510,19 +510,14 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
                sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
                break;
        case SDP_MID_SRCAVAIL_CANCEL:
-               spin_lock_irq(&ssk->rx_ring.lock);
                if (ssk->rx_sa && !ssk->rx_sa->is_treated &&
                                after(ntohl(h->mseq), ssk->rx_sa->mseq)) {
                        sdp_dbg(sk, "Handling SrcAvailCancel - post SendSM\n");
                        RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL;
                        kfree(ssk->rx_sa);
                        ssk->rx_sa = NULL;
-                       spin_unlock_irq(&ssk->rx_ring.lock);
                        sdp_post_sendsm(sk);
-                       break;
                }
-
-               spin_unlock_irq(&ssk->rx_ring.lock);
                break;
        case SDP_MID_SINKAVAIL:
        case SDP_MID_ABORT:
@@ -708,16 +703,14 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
                sock_wake_async(sock, 2, POLL_OUT);
 }
 
-static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+int sdp_poll_rx_cq(struct sdp_sock *ssk)
 {
        struct ib_cq *cq = ssk->rx_ring.cq;
        struct ib_wc ibwc[SDP_NUM_WC];
        int n, i;
        int wc_processed = 0;
        struct sk_buff *skb;
-       unsigned long flags;
 
-       spin_lock_irqsave(&ssk->rx_ring.lock, flags);
        do {
                n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
                for (i = 0; i < n; ++i) {
@@ -732,10 +725,11 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk)
                        wc_processed++;
                }
        } while (n == SDP_NUM_WC);
-       spin_unlock_irqrestore(&ssk->rx_ring.lock, flags);
 
-       if (wc_processed)
+       if (wc_processed) {
+               sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
                sdp_bzcopy_write_space(ssk);
+       }
 
        return wc_processed;
 }
@@ -768,8 +762,9 @@ static void sdp_rx_comp_work(struct work_struct *work)
 
        lock_sock(sk);
 
+       posts_handler_get(ssk);
        sdp_do_posts(ssk);
-
+       posts_handler_put(ssk, SDP_RX_ARMING_DELAY);
        release_sock(sk);
 }
 
@@ -785,7 +780,7 @@ void sdp_do_posts(struct sdp_sock *ssk)
        }
 
        if (likely(ssk->rx_ring.cq))
-               sdp_process_rx(sdp_sk(sk));
+               sdp_poll_rx_cq(ssk);
 
        while ((skb = skb_dequeue(&ssk->rx_ctl_q)))
                sdp_process_rx_ctl_skb(ssk, skb);
@@ -818,13 +813,19 @@ void sdp_do_posts(struct sdp_sock *ssk)
 
 }
 
+static inline int should_wake_up(struct sock *sk)
+{
+       return sk->sk_sleep && waitqueue_active(sk->sk_sleep) &&
+               (posts_handler(sdp_sk(sk)) || somebody_is_waiting(sk));
+}
+
 static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 {
        struct sock *sk = cq_context;
        struct sdp_sock *ssk = sdp_sk(sk);
 
-       if (cq != ssk->rx_ring.cq) {
-               sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
+       if (unlikely(cq != ssk->rx_ring.cq)) {
+               sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
                return;
        }
 
@@ -832,65 +833,15 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
 
        sdp_prf(sk, NULL, "rx irq");
 
-       /* We could use rx_ring.timer instead, but mod_timer(..., 0)
-        * measured to add 4ms delay.
-        */
-       tasklet_hi_schedule(&ssk->rx_ring.tasklet);
-}
-
-static inline int sdp_should_rearm(struct sock *sk)
-{
-       return sk->sk_state != TCP_ESTABLISHED ||
-               sdp_sk(sk)->tx_sa ||
-               (sk->sk_socket && test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags));
-}
-
-int sdp_process_rx(struct sdp_sock *ssk)
-{
-       struct sock *sk = &ssk->isk.sk;
-       int wc_processed;
-       int credits_before;
-
-       if (!rx_ring_trylock(&ssk->rx_ring)) {
-               sdp_dbg(&ssk->isk.sk, "ring destroyed. not polling it\n");
-               return 0;
-       }
-
-       credits_before = tx_credits(ssk);
-
-       wc_processed = sdp_poll_rx_cq(ssk);
-
-       if (wc_processed) {
-               sdp_prf(sk, NULL, "processed %d", wc_processed);
-               sdp_prf(sk, NULL, "credits:  %d -> %d",
-                               credits_before, tx_credits(ssk));
-
-               if (posts_handler(ssk) || (sk->sk_socket &&
-                       test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags))) {
-
-                       sdp_prf(&ssk->isk.sk, NULL,
-                               "Somebody is doing the post work for me. %d",
-                               posts_handler(ssk));
-
-               } else {
-                       sdp_prf(&ssk->isk.sk, NULL, "Queuing work. ctl_q: %d",
-                                       !skb_queue_empty(&ssk->rx_ctl_q));
-                       queue_work(rx_comp_wq, &ssk->rx_comp_work);
-               }
+       if (should_wake_up(sk)) {
+               wake_up_interruptible(sk->sk_sleep);
+               SDPSTATS_COUNTER_INC(rx_int_wake_up);
+       } else {
+               if (queue_work_on(ssk->cpu, rx_comp_wq, &ssk->rx_comp_work))
+                       SDPSTATS_COUNTER_INC(rx_int_queue);
+               else
+                       SDPSTATS_COUNTER_INC(rx_int_no_op);
        }
-
-       if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk)))
-               sdp_arm_rx_cq(sk);
-
-       rx_ring_unlock(&ssk->rx_ring);
-
-       return wc_processed;
-}
-
-static void sdp_process_rx_timer(unsigned long data)
-{
-       struct sdp_sock *ssk = (struct sdp_sock *)data;
-       sdp_process_rx(ssk);
 }
 
 static void sdp_rx_ring_purge(struct sdp_sock *ssk)
@@ -924,15 +875,16 @@ static void sdp_rx_ring_purge(struct sdp_sock *ssk)
        }
 }
 
-void sdp_rx_ring_init(struct sdp_sock *ssk)
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
 {
-       ssk->rx_ring.buffer = NULL;
-       ssk->rx_ring.destroyed = 0;
-       rwlock_init(&ssk->rx_ring.destroyed_lock);
 }
 
-static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+static void sdp_arm_cq_timer(unsigned long data)
 {
+       struct sdp_sock *ssk = (struct sdp_sock *)data;
+
+       SDPSTATS_COUNTER_INC(rx_cq_arm_timer);
+       sdp_arm_rx_cq(&ssk->isk.sk);
 }
 
 int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
@@ -966,13 +918,9 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
 
        sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
 
-       spin_lock_init(&ssk->rx_ring.lock);
-
        INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
-       ssk->rx_ring.timer.function = sdp_process_rx_timer;
-       ssk->rx_ring.timer.data = (unsigned long) ssk;
-       tasklet_init(&ssk->rx_ring.tasklet, sdp_process_rx_timer,
-                       (unsigned long) ssk);
+       setup_timer(&ssk->rx_ring.cq_arm_timer, sdp_arm_cq_timer,
+                       (unsigned long)ssk);
        sdp_arm_rx_cq(&ssk->isk.sk);
 
        return 0;
@@ -985,7 +933,7 @@ err_cq:
 
 void sdp_rx_ring_destroy(struct sdp_sock *ssk)
 {
-       rx_ring_destroy_lock(&ssk->rx_ring);
+       del_timer_sync(&ssk->rx_ring.cq_arm_timer);
 
        if (ssk->rx_ring.buffer) {
                sdp_rx_ring_purge(ssk);
@@ -1003,11 +951,5 @@ void sdp_rx_ring_destroy(struct sdp_sock *ssk)
                }
        }
 
-       /* the tasklet should be killed only after the rx_cq is destroyed,
-        * so there won't be rx_irq any more, meaning the tasklet will never be
-        * enabled. */
-       del_timer_sync(&ssk->rx_ring.timer);
-       tasklet_kill(&ssk->rx_ring.tasklet);
-
        SDP_WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
 }
index 5b4568ef207db475ebcc4fe1280a873fa2469339..6e269ce40062eafc80618e64eca8eddb8f0e9a49 100644 (file)
@@ -370,7 +370,8 @@ static void sdp_poll_tx_timeout(unsigned long data)
        if (sock_owned_by_user(sk)) {
                sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy");
 
-               if (sdp_tx_handler_select(ssk) && sk->sk_state != TCP_CLOSE) {
+               if (sdp_tx_handler_select(ssk) && sk->sk_state != TCP_CLOSE &&
+                               likely(ssk->qp_active)) {
                        sdp_prf1(sk, NULL, "schedule a timer");
                        mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
                }
@@ -379,8 +380,10 @@ static void sdp_poll_tx_timeout(unsigned long data)
                goto out;
        }
 
-       if (unlikely(sk->sk_state == TCP_CLOSE))
+       if (unlikely(!ssk->qp || sk->sk_state == TCP_CLOSE)) {
+               SDPSTATS_COUNTER_INC(tx_poll_no_op);
                goto out;
+       }
 
        wc_processed = sdp_process_tx_cq(ssk);
        if (!wc_processed)
@@ -395,7 +398,7 @@ static void sdp_poll_tx_timeout(unsigned long data)
        /* If there are still packets in flight and the timer has not already
         * been scheduled by the Tx routine then schedule it here to guarantee
         * completion processing of these packets */
-       if (inflight)
+       if (inflight && likely(ssk->qp_active))
                mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
 
 out:
@@ -496,8 +499,8 @@ int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
 
        sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
 
-       ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
-       ssk->tx_ring.timer.data = (unsigned long) ssk;
+       setup_timer(&ssk->tx_ring.timer, sdp_poll_tx_timeout,
+                       (unsigned long)ssk);
        ssk->tx_ring.poll_cnt = 0;
 
        tasklet_init(&ssk->tx_ring.tasklet, sdp_poll_tx_timeout,
@@ -516,6 +519,7 @@ out:
 
 void sdp_tx_ring_destroy(struct sdp_sock *ssk)
 {
+       del_timer_sync(&ssk->tx_ring.timer);
 
        if (ssk->nagle_timer.function)
                del_timer_sync(&ssk->nagle_timer);
index f04193af956843d56f153a4ebe3501e92ecf6e58..b12f78262a38d1de135c832f2e6731a2575e899e 100644 (file)
@@ -366,8 +366,6 @@ void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
 
        spin_lock_irqsave(&ssk->tx_sa_lock, flags);
 
-       BUG_ON(!ssk);
-
        if (!ssk->tx_sa) {
                sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
                goto out;
@@ -631,27 +629,25 @@ static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa
 
        rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
                        &tx_sa->fmr, &tx_sa->umem);
-       if (rc) {
+       if (unlikely(rc)) {
                sdp_dbg_data(sk, "Error allocating fmr: %d\n", rc);
                goto err_alloc_fmr;
        }
 
        if (tx_slots_free(ssk) == 0) {
                rc = wait_for_sndbuf(sk, timeo);
-               if (rc) {
+               if (unlikely(rc)) {
                        sdp_warn(sk, "Couldn't get send buffer\n");
                        goto err_no_tx_slots;
                }
        }
 
        rc = sdp_post_srcavail(sk, tx_sa);
-       if (rc) {
+       if (unlikely(rc)) {
                sdp_dbg(sk, "Error posting SrcAvail\n");
                goto err_abort_send;
        }
 
-       sdp_arm_rx_cq(sk);
-
        rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
        if (unlikely(rc)) {
                enum tx_sa_flag f = tx_sa->abort_flags;
@@ -785,16 +781,13 @@ void sdp_abort_rdma_read(struct sock *sk)
        struct sdp_sock *ssk = sdp_sk(sk);
        struct rx_srcavail_state *rx_sa;
 
-       spin_lock_irq(&ssk->rx_ring.lock);
        rx_sa = ssk->rx_sa;
        if (!rx_sa)
-               goto out;
+               return;
 
        sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
 
        /* kfree(rx_sa) and posting SendSM will be handled in the nornal
         * flows.
         */
-out:
-       spin_unlock_irq(&ssk->rx_ring.lock);
 }