From: Amir Vadai Date: Sun, 12 Sep 2010 16:24:35 +0000 (+0200) Subject: sdp: Some improvements to multistream BW X-Git-Tag: v4.1.12-92~264^2~5^2~108 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=12fd6a89128ab9fb67aa44fe3463dc3ac8da4e98;p=users%2Fjedix%2Flinux-maple.git sdp: Some improvements to multistream BW - Removed almost all locks in data path - using lock_sock only - CPU Affinity for SKB handling - Don't do any RX proccessing from IRQ/timer - recv_poll value in usec instead of msec Signed-off-by: Eldad Zinger Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 4ecf9a0c9e10..547a16282989 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -18,7 +18,7 @@ #define SDP_TX_POLL_TIMEOUT (HZ / 20) #define SDP_NAGLE_TIMEOUT (HZ / 10) -#define SDP_RX_POLL_TIMEOUT (1 + HZ / 1000) +#define SDP_RX_ARMING_DELAY (msecs_to_jiffies(10)) #define SDP_RDMA_READ_TIMEOUT (5 * HZ) #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5) @@ -97,25 +97,15 @@ struct sdp_skb_cb { #define posts_handler_get(ssk) \ do { \ atomic_inc(&ssk->somebody_is_doing_posts); \ - /* postpone the rx_ring.timer, there is no need to enable - * interrupts because there will be cq-polling. */ \ - if (likely(ssk->qp_active)) \ - mod_timer(&ssk->rx_ring.timer, MAX_JIFFY_OFFSET); \ + sdp_postpone_rx_timer(ssk); \ } while (0) #define posts_handler_put(ssk, intr_delay) \ do { \ sdp_do_posts(ssk); \ if (atomic_dec_and_test(&ssk->somebody_is_doing_posts) && \ - likely(ssk->qp_active)) { \ - if (intr_delay) \ - mod_timer(&ssk->rx_ring.timer, intr_delay); \ - else \ - /* There is no point of setting up a timer - * for an immediate cq-arming, better arm it - * now. */ \ - sdp_arm_rx_cq(&ssk->isk.sk); \ - } \ + likely(ssk->qp_active)) \ + sdp_schedule_arm_rx_cq(ssk, intr_delay);\ } while (0) #define sdp_common_release(sk) do { \ @@ -315,12 +305,7 @@ struct sdp_rx_ring { atomic_t tail; struct ib_cq *cq; - int destroyed; - rwlock_t destroyed_lock; - spinlock_t lock; - - struct timer_list timer; - struct tasklet_struct tasklet; + struct timer_list cq_arm_timer; }; struct sdp_device { @@ -359,6 +344,7 @@ struct sdp_sock { struct sk_buff_head rx_ctl_q; struct sock *parent; struct sdp_device *sdp_dev; + int cpu; int qp_active; spinlock_t tx_sa_lock; @@ -462,38 +448,10 @@ static inline void tx_sa_reset(struct tx_srcavail_state *tx_sa) sizeof(*tx_sa) - offsetof(typeof(*tx_sa), busy)); } -static inline void rx_ring_unlock(struct sdp_rx_ring *rx_ring) -{ - read_unlock_bh(&rx_ring->destroyed_lock); -} - -static inline int rx_ring_trylock(struct sdp_rx_ring *rx_ring) -{ - read_lock_bh(&rx_ring->destroyed_lock); - if (rx_ring->destroyed) { - rx_ring_unlock(rx_ring); - return 0; - } - return 1; -} - -static inline void rx_ring_destroy_lock(struct sdp_rx_ring *rx_ring) -{ - write_lock_bh(&rx_ring->destroyed_lock); - rx_ring->destroyed = 1; - write_unlock_bh(&rx_ring->destroyed_lock); -} - static inline int sdp_chk_sa_cancel(struct sdp_sock *ssk, struct rx_srcavail_state *rx_sa) { - int res; - - spin_lock_irq(&ssk->rx_ring.lock); - res = ssk->sa_cancel_arrived && + return ssk->sa_cancel_arrived && before(rx_sa->mseq, ssk->sa_cancel_mseq); - spin_unlock_irq(&ssk->rx_ring.lock); - - return res; } static inline struct sdp_sock *sdp_sk(const struct sock *sk) @@ -546,29 +504,6 @@ static inline void sdp_set_error(struct sock *sk, int err) sk->sk_error_report(sk); } -static inline void sdp_arm_rx_cq(struct sock *sk) -{ - if (unlikely(!sdp_sk(sk)->rx_ring.cq)) - return; - - sdp_prf(sk, NULL, "Arming RX cq"); - sdp_dbg_data(sk, "Arming RX cq\n"); - - ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP); -} - -static inline void sdp_arm_tx_cq(struct sock *sk) -{ - if (unlikely(!sdp_sk(sk)->tx_ring.cq)) - return; - - sdp_prf(sk, NULL, "Arming TX cq"); - sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n", - tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk))); - - ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP); -} - /* return the min of: * - tx credits * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS @@ -771,21 +706,29 @@ struct sdpstats { u32 sendmsg; u32 recvmsg; u32 post_send_credits; - u32 sendmsg_nagle_skip; u32 sendmsg_seglen[25]; u32 send_size[25]; u32 post_recv; + u32 rx_int_arm; + u32 tx_int_arm; u32 rx_int_count; u32 tx_int_count; + u32 rx_int_wake_up; + u32 rx_int_queue; + u32 rx_int_no_op; + u32 rx_cq_modified; + u32 rx_cq_arm_timer; u32 rx_wq; u32 bzcopy_poll_miss; u32 send_wait_for_mem; u32 send_miss_no_credits; u32 rx_poll_miss; u32 rx_poll_hit; + u32 poll_hit_usec[16]; u32 tx_poll_miss; u32 tx_poll_hit; u32 tx_poll_busy; + u32 tx_poll_no_op; u32 memcpy_count; u32 credits_before_update[64]; u32 zcopy_tx_timeout; @@ -793,6 +736,8 @@ struct sdpstats { u32 zcopy_tx_aborted; u32 zcopy_tx_error; u32 fmr_alloc_error; + u32 keepalive_timer; + u32 nagle_timer; }; static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) @@ -842,6 +787,60 @@ static inline void sdp_cleanup_sdp_buf(struct sdp_sock *ssk, struct sdp_buf *sbu } } +static inline void sdp_arm_rx_cq(struct sock *sk) +{ + if (unlikely(!sdp_sk(sk)->rx_ring.cq)) + return; + + SDPSTATS_COUNTER_INC(rx_int_arm); + sdp_dbg_data(sk, "Arming RX cq\n"); + + if (unlikely(0 > ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, + IB_CQ_NEXT_COMP))) + sdp_warn(sk, "error arming rx cq\n"); +} + +static inline void sdp_arm_tx_cq(struct sock *sk) +{ + if (unlikely(!sdp_sk(sk)->tx_ring.cq)) + return; + + SDPSTATS_COUNTER_INC(tx_int_arm); + sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n", + tx_credits(sdp_sk(sk)), tx_ring_posted(sdp_sk(sk))); + + if (unlikely(0 > ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, + IB_CQ_NEXT_COMP))) + sdp_warn(sk, "error arming tx cq\n"); +} + +static inline void sdp_postpone_rx_timer(struct sdp_sock *ssk) +{ + if (timer_pending(&ssk->rx_ring.cq_arm_timer) && ssk->qp_active) + mod_timer(&ssk->rx_ring.cq_arm_timer, MAX_JIFFY_OFFSET); +} + +static inline void sdp_schedule_arm_rx_cq(struct sdp_sock *ssk, + unsigned long delay) +{ + if (unlikely(!ssk->rx_ring.cq)) + return; + + if (delay && ssk->qp_active) + mod_timer(&ssk->rx_ring.cq_arm_timer, jiffies + delay); + else { + /* There is no point of setting up a timer for an immediate + * cq-arming, better arm it now. */ + sdp_arm_rx_cq(&ssk->isk.sk); + } +} + +static inline int somebody_is_waiting(struct sock *sk) +{ + return sk->sk_socket && + test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags); +} + /* sdp_main.c */ void sdp_set_default_moderation(struct sdp_sock *ssk); int sdp_init_sock(struct sock *sk); @@ -873,7 +872,6 @@ void sdp_nagle_timeout(unsigned long data); void sdp_post_keepalive(struct sdp_sock *ssk); /* sdp_rx.c */ -void sdp_rx_ring_init(struct sdp_sock *ssk); int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device); void sdp_rx_ring_destroy(struct sdp_sock *ssk); int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); @@ -882,7 +880,7 @@ void sdp_do_posts(struct sdp_sock *ssk); void sdp_rx_comp_full(struct sdp_sock *ssk); void sdp_remove_large_sock(const struct sdp_sock *ssk); void sdp_handle_disconn(struct sock *sk); -int sdp_process_rx(struct sdp_sock *ssk); +int sdp_poll_rx_cq(struct sdp_sock *ssk); /* sdp_zcopy.c */ int sdp_sendmsg_zcopy(struct kiocb *iocb, struct sock *sk, struct iovec *iov); diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 6e91c426e20b..166dae322964 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -85,7 +85,7 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str, srcah = (struct sdp_srcah *)(h+1); len += snprintf(buf + len, 255-len, " | payload: 0x%zx, " - "len: 0x%zx, rkey: 0x%x, vaddr: 0x%llx |", + "len: 0x%x, rkey: 0x%x, vaddr: 0x%llx |", ntohl(h->len) - sizeof(struct sdp_bsdh) - sizeof(struct sdp_srcah), ntohl(srcah->len), ntohl(srcah->rkey), @@ -148,6 +148,7 @@ void sdp_nagle_timeout(unsigned long data) struct sdp_sock *ssk = (struct sdp_sock *)data; struct sock *sk = &ssk->isk.sk; + SDPSTATS_COUNTER_INC(nagle_timer); sdp_dbg_data(sk, "last_unacked = %ld\n", ssk->nagle_last_unacked); if (!ssk->nagle_last_unacked) @@ -179,6 +180,12 @@ out2: } } +static inline int sdp_should_rearm(struct sock *sk) +{ + return sk->sk_state != TCP_ESTABLISHED || sdp_sk(sk)->tx_sa || + somebody_is_waiting(sk); +} + void sdp_post_sends(struct sdp_sock *ssk, gfp_t gfp) { /* TODO: nonagle? */ @@ -200,8 +207,12 @@ void sdp_post_sends(struct sdp_sock *ssk, gfp_t gfp) sdp_xmit_poll(ssk, 1); /* Run out of credits, check if got a credit update */ - if (unlikely(tx_credits(ssk) <= SDP_MIN_TX_CREDITS)) - sdp_process_rx(ssk); + if (unlikely(tx_credits(ssk) <= SDP_MIN_TX_CREDITS)) { + sdp_poll_rx_cq(ssk); + + if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk))) + sdp_arm_rx_cq(sk); + } if (ssk->recv_request && ring_tail(ssk->rx_ring) >= ssk->recv_request_head && diff --git a/drivers/infiniband/ulp/sdp/sdp_dbg.h b/drivers/infiniband/ulp/sdp/sdp_dbg.h index 6d7d8e6db33d..da9bdc9f57ae 100644 --- a/drivers/infiniband/ulp/sdp/sdp_dbg.h +++ b/drivers/infiniband/ulp/sdp/sdp_dbg.h @@ -23,7 +23,7 @@ _sdp_printk(__func__, __LINE__, level, sk, format, ## arg) #define sdp_warn(sk, format, arg...) \ do { \ - sdp_printk(KERN_WARNING, sk, "\t%lx: " format , jiffies, ## arg); \ + sdp_printk(KERN_WARNING, sk, format, ## arg); \ sdp_prf(sk, NULL, format , ## arg); \ } while (0) diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 4f2127c044c4..f1a7d757df34 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -90,7 +90,7 @@ SDP_MODPARAM_INT(sdp_data_debug_level, 0, SDP_MODPARAM_SINT(sdp_fmr_pool_size, 20, "Number of FMRs to allocate for pool"); SDP_MODPARAM_SINT(sdp_fmr_dirty_wm, 5, "Watermark to flush fmr pool"); -SDP_MODPARAM_SINT(recv_poll, 10, "How many msec to poll recv."); +SDP_MODPARAM_SINT(recv_poll, 700, "usecs to poll recv before arming interrupt."); SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME, "Default idle time in seconds before keepalive probe sent."); static int sdp_bzcopy_thresh = 0; @@ -189,8 +189,6 @@ static void sdp_destroy_qp(struct sdp_sock *ssk) ssk->qp_active = 0; - del_timer(&ssk->tx_ring.timer); - if (ssk->qp) { ib_destroy_qp(ssk->qp); ssk->qp = NULL; @@ -232,6 +230,7 @@ static void sdp_keepalive_timer(unsigned long data) struct sdp_sock *ssk = sdp_sk(sk); sdp_dbg(sk, "%s\n", __func__); + SDPSTATS_COUNTER_INC(keepalive_timer); /* Only process if the socket is not in use */ bh_lock_sock(sk); @@ -255,12 +254,6 @@ out: sock_put(sk, SOCK_REF_KEEPALIVE); } -static void sdp_init_keepalive_timer(struct sock *sk) -{ - sk->sk_timer.function = sdp_keepalive_timer; - sk->sk_timer.data = (unsigned long)sk; -} - static void sdp_set_keepalive(struct sock *sk, int val) { sdp_dbg(sk, "%s %d\n", __func__, val); @@ -293,12 +286,13 @@ void sdp_set_default_moderation(struct sdp_sock *ssk) if (hw_int_mod_count > 0 && hw_int_mod_usec > 0) { err = ib_modify_cq(ssk->rx_ring.cq, hw_int_mod_count, hw_int_mod_usec); - if (err) + if (unlikely(err)) sdp_warn(sk, - "Failed modifying moderation for cq"); + "Failed modifying moderation for cq\n"); else sdp_dbg(sk, "Using fixed interrupt moderation\n"); + SDPSTATS_COUNTER_INC(rx_cq_modified); } return; } @@ -413,10 +407,11 @@ static void sdp_auto_moderation(struct sdp_sock *ssk) if (moder_time != mod->last_moder_time) { mod->last_moder_time = moder_time; err = ib_modify_cq(ssk->rx_ring.cq, mod->moder_cnt, moder_time); - if (err) { + if (unlikely(err)) { sdp_dbg_data(&ssk->isk.sk, "Failed modifying moderation for cq"); } + SDPSTATS_COUNTER_INC(rx_cq_modified); } out: @@ -663,6 +658,7 @@ static void sdp_close(struct sock *sk, long timeout) sdp_dbg(sk, "%s\n", __func__); sdp_prf(sk, NULL, __func__); + sdp_sk(sk)->cpu = smp_processor_id(); sdp_delete_keepalive_timer(sk); sk->sk_shutdown = SHUTDOWN_MASK; @@ -756,6 +752,8 @@ static int sdp_connect(struct sock *sk, struct sockaddr *uaddr, int addr_len) .sin_addr.s_addr = inet_sk(sk)->saddr, }; int rc; + + ssk->cpu = smp_processor_id(); release_sock(sk); flush_workqueue(sdp_wq); lock_sock(sk); @@ -809,6 +807,7 @@ static int sdp_disconnect(struct sock *sk, int flags) sdp_dbg(sk, "%s\n", __func__); + ssk->cpu = smp_processor_id(); if (sk->sk_state != TCP_LISTEN) { if (ssk->id) { sdp_sk(sk)->qp_active = 0; @@ -899,6 +898,7 @@ static struct sock *sdp_accept(struct sock *sk, int flags, int *err) ssk = sdp_sk(sk); lock_sock(sk); + ssk->cpu = smp_processor_id(); /* We need to make sure that this socket is listening, * and that it has something pending. @@ -961,6 +961,7 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg) return -EINVAL; lock_sock(sk); + ssk->cpu = smp_processor_id(); if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV)) answ = 0; else if (sock_flag(sk, SOCK_URGINLINE) || @@ -1122,7 +1123,7 @@ int sdp_init_sock(struct sock *sk) atomic_set(&ssk->mseq_ack, 0); - sdp_rx_ring_init(ssk); + ssk->rx_ring.buffer = NULL; ssk->tx_ring.buffer = NULL; ssk->sdp_disconnect = 0; ssk->destructed_already = 0; @@ -1132,10 +1133,10 @@ int sdp_init_sock(struct sock *sk) ssk->tx_compl_pending = 0; atomic_set(&ssk->somebody_is_doing_posts, 0); - + ssk->cpu = smp_processor_id(); ssk->tx_ring.rdma_inflight = NULL; - init_timer(&ssk->rx_ring.timer); + init_timer(&ssk->rx_ring.cq_arm_timer); init_timer(&ssk->tx_ring.timer); init_timer(&ssk->nagle_timer); init_timer(&sk->sk_timer); @@ -1206,6 +1207,7 @@ static int sdp_setsockopt(struct sock *sk, int level, int optname, return -EFAULT; lock_sock(sk); + ssk->cpu = smp_processor_id(); /* SOCK_KEEPALIVE is really a SOL_SOCKET level option but there * is a problem handling it at that level. In order to start @@ -1348,9 +1350,20 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname, return 0; } +static inline int cycles_before(cycles_t a, cycles_t b) +{ + /* cycles_t is unsigned, but may be int/long/long long. */ + + if (sizeof(cycles_t) == 4) + return before(a, b); + else + return (s64)(a - b) < 0; +} + static inline int poll_recv_cq(struct sock *sk) { - unsigned long jiffies_end = jiffies + recv_poll * HZ / 1000; + cycles_t start = get_cycles(); + cycles_t end = start + recv_poll * cpu_khz / 1000; sdp_prf(sk, NULL, "polling recv"); @@ -1358,11 +1371,15 @@ static inline int poll_recv_cq(struct sock *sk) return 0; do { - if (sdp_process_rx(sdp_sk(sk))) { + if (sdp_poll_rx_cq(sdp_sk(sk))) { SDPSTATS_COUNTER_INC(rx_poll_hit); + SDPSTATS_HIST(poll_hit_usec, + (get_cycles() - start) * + 1000 / cpu_khz); return 0; } - } while (jiffies < jiffies_end); + } while (cycles_before(get_cycles(), end)); + SDPSTATS_COUNTER_INC(rx_poll_miss); return 1; } @@ -1881,6 +1898,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, SDPSTATS_COUNTER_INC(sendmsg); lock_sock(sk); + ssk->cpu = smp_processor_id(); sdp_dbg_data(sk, "%s size = 0x%zx\n", __func__, size); posts_handler_get(ssk); @@ -1962,13 +1980,10 @@ new_segment: ( __bz && tx_slots_free(ssk) < __bz->busy) || \ (!__bz && !sk_stream_memory_free(sk))) if (unlikely(can_not_tx(bz))) { - if (!poll_recv_cq(sk)) { + if (!poll_recv_cq(sk)) sdp_do_posts(ssk); - } - if ((can_not_tx(bz))) { - sdp_arm_rx_cq(sk); + if ((can_not_tx(bz))) goto wait_for_sndbuf; - } } skb = sdp_alloc_skb_data(sk, min(seglen, size_goal), 0); @@ -2082,7 +2097,7 @@ out_err: sdp_dbg_data(sk, "err: %d\n", err); fin: - posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT); + posts_handler_put(ssk, SDP_RX_ARMING_DELAY); if (!err && !ssk->qp_active) { err = -EPIPE; @@ -2105,14 +2120,10 @@ static inline int sdp_abort_rx_srcavail(struct sock *sk) h->mid = SDP_MID_DATA; - spin_lock_irq(&ssk->rx_ring.lock); - RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL; kfree(ssk->rx_sa); ssk->rx_sa = NULL; - spin_unlock_irq(&ssk->rx_ring.lock); - return 0; } @@ -2138,6 +2149,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, SDPSTATS_COUNTER_INC(recvmsg); lock_sock(sk); + ssk->cpu = smp_processor_id(); sdp_dbg_data(sk, "iovlen: %zd iov_len: 0x%zx flags: 0x%x peek: 0x%x\n", msg->msg_iovlen, msg->msg_iov[0].iov_len, flags, MSG_PEEK); @@ -2204,17 +2216,14 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, goto found_fin_ok; case SDP_MID_SRCAVAIL: - spin_lock_irq(&ssk->rx_ring.lock); rx_sa = RX_SRCAVAIL_STATE(skb); if (unlikely(!rx_sa)) { /* SrcAvailCancel arrived and handled */ h->mid = SDP_MID_DATA; - spin_unlock_irq(&ssk->rx_ring.lock); goto sdp_mid_data; } rx_sa->is_treated = 1; - spin_unlock_irq(&ssk->rx_ring.lock); if (sdp_chk_sa_cancel(ssk, rx_sa) || !ssk->sdp_dev || @@ -2488,7 +2497,7 @@ got_disconn_in_peek: err = copied; out: - posts_handler_put(ssk, jiffies + SDP_RX_POLL_TIMEOUT); + posts_handler_put(ssk, SDP_RX_ARMING_DELAY); sdp_auto_moderation(ssk); @@ -2540,6 +2549,7 @@ static int sdp_inet_listen(struct socket *sock, int backlog) int err; lock_sock(sk); + sdp_sk(sk)->cpu = smp_processor_id(); err = -EINVAL; if (sock->state != SS_UNCONNECTED) @@ -2585,11 +2595,13 @@ static unsigned int sdp_poll(struct file *file, struct socket *socket, sdp_dbg_data(sk, "%s\n", __func__); lock_sock(sk); + sdp_sk(sk)->cpu = smp_processor_id(); if (sk->sk_state == TCP_ESTABLISHED) { sdp_prf(sk, NULL, "polling"); - if (poll_recv_cq(sk)) - sdp_arm_rx_cq(sk); + posts_handler_get(sdp_sk(sk)); + poll_recv_cq(sk); + posts_handler_put(sdp_sk(sk), 0); } mask = datagram_poll(file, socket, wait); @@ -2728,7 +2740,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol) sk->sk_destruct = sdp_destruct; - sdp_init_keepalive_timer(sk); + setup_timer(&sk->sk_timer, sdp_keepalive_timer, (unsigned long)sk); sock->ops = &sdp_proto_ops; sock->state = SS_UNCONNECTED; @@ -2899,7 +2911,7 @@ static int __init sdp_init(void) sdp_proto.sockets_allocated = sockets_allocated; sdp_proto.orphan_count = orphan_count; - rx_comp_wq = create_singlethread_workqueue("rx_comp_wq"); + rx_comp_wq = create_workqueue("rx_comp_wq"); if (!rx_comp_wq) goto no_mem_rx_wq; diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c index 3faa8271344d..9f94c2b4390f 100644 --- a/drivers/infiniband/ulp/sdp/sdp_proc.c +++ b/drivers/infiniband/ulp/sdp/sdp_proc.c @@ -236,7 +236,7 @@ static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n, memset(s, '*', j); s[j] = '\0'; - seq_printf(seq, "%10d | %-50s - %d\n", val, s, h[i]); + seq_printf(seq, "%10d | %-50s - %u\n", val, s, h[i]); } } @@ -310,6 +310,8 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v) } seq_printf(seq, "\n"); + seq_printf(seq, "sdp_recvmsg() calls\t\t: %d\n", + SDPSTATS_COUNTER_GET(recvmsg)); seq_printf(seq, "post_recv \t\t: %d\n", SDPSTATS_COUNTER_GET(post_recv)); seq_printf(seq, "BZCopy poll miss \t\t: %d\n", @@ -321,12 +323,26 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v) seq_printf(seq, "rx_poll_miss \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_miss)); seq_printf(seq, "rx_poll_hit \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_poll_hit)); + __sdpstats_seq_hist(seq, "poll_hit_usec", poll_hit_usec, 1); + seq_printf(seq, "rx_cq_arm_timer \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_cq_arm_timer)); + seq_printf(seq, "tx_poll_miss \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_miss)); seq_printf(seq, "tx_poll_busy \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_busy)); seq_printf(seq, "tx_poll_hit \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_hit)); + seq_printf(seq, "tx_poll_no_op \t\t: %d\n", SDPSTATS_COUNTER_GET(tx_poll_no_op)); + + seq_printf(seq, "keepalive timer \t\t: %d\n", SDPSTATS_COUNTER_GET(keepalive_timer)); + seq_printf(seq, "nagle timer \t\t: %d\n", SDPSTATS_COUNTER_GET(nagle_timer)); seq_printf(seq, "CQ stats:\n"); - seq_printf(seq, "- RX interrupts\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_count)); + seq_printf(seq, "- RX irq armed \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_arm)); + seq_printf(seq, "- RX interrupts \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_count)); + seq_printf(seq, "- RX int wake up\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_wake_up)); + seq_printf(seq, "- RX int queue \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_queue)); + seq_printf(seq, "- RX int no op \t\t: %d\n", SDPSTATS_COUNTER_GET(rx_int_no_op)); + seq_printf(seq, "- RX cq modified\t\t: %d\n", SDPSTATS_COUNTER_GET(rx_cq_modified)); + + seq_printf(seq, "- TX irq armed\t\t: %d\n", SDPSTATS_COUNTER_GET(tx_int_arm)); seq_printf(seq, "- TX interrupts\t\t: %d\n", SDPSTATS_COUNTER_GET(tx_int_count)); seq_printf(seq, "ZCopy stats:\n"); diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 9aa6acacc882..a65cb4ba722a 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -510,19 +510,14 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) sdp_handle_sendsm(ssk, ntohl(h->mseq_ack)); break; case SDP_MID_SRCAVAIL_CANCEL: - spin_lock_irq(&ssk->rx_ring.lock); if (ssk->rx_sa && !ssk->rx_sa->is_treated && after(ntohl(h->mseq), ssk->rx_sa->mseq)) { sdp_dbg(sk, "Handling SrcAvailCancel - post SendSM\n"); RX_SRCAVAIL_STATE(ssk->rx_sa->skb) = NULL; kfree(ssk->rx_sa); ssk->rx_sa = NULL; - spin_unlock_irq(&ssk->rx_ring.lock); sdp_post_sendsm(sk); - break; } - - spin_unlock_irq(&ssk->rx_ring.lock); break; case SDP_MID_SINKAVAIL: case SDP_MID_ABORT: @@ -708,16 +703,14 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk) sock_wake_async(sock, 2, POLL_OUT); } -static int sdp_poll_rx_cq(struct sdp_sock *ssk) +int sdp_poll_rx_cq(struct sdp_sock *ssk) { struct ib_cq *cq = ssk->rx_ring.cq; struct ib_wc ibwc[SDP_NUM_WC]; int n, i; int wc_processed = 0; struct sk_buff *skb; - unsigned long flags; - spin_lock_irqsave(&ssk->rx_ring.lock, flags); do { n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); for (i = 0; i < n; ++i) { @@ -732,10 +725,11 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk) wc_processed++; } } while (n == SDP_NUM_WC); - spin_unlock_irqrestore(&ssk->rx_ring.lock, flags); - if (wc_processed) + if (wc_processed) { + sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed); sdp_bzcopy_write_space(ssk); + } return wc_processed; } @@ -768,8 +762,9 @@ static void sdp_rx_comp_work(struct work_struct *work) lock_sock(sk); + posts_handler_get(ssk); sdp_do_posts(ssk); - + posts_handler_put(ssk, SDP_RX_ARMING_DELAY); release_sock(sk); } @@ -785,7 +780,7 @@ void sdp_do_posts(struct sdp_sock *ssk) } if (likely(ssk->rx_ring.cq)) - sdp_process_rx(sdp_sk(sk)); + sdp_poll_rx_cq(ssk); while ((skb = skb_dequeue(&ssk->rx_ctl_q))) sdp_process_rx_ctl_skb(ssk, skb); @@ -818,13 +813,19 @@ void sdp_do_posts(struct sdp_sock *ssk) } +static inline int should_wake_up(struct sock *sk) +{ + return sk->sk_sleep && waitqueue_active(sk->sk_sleep) && + (posts_handler(sdp_sk(sk)) || somebody_is_waiting(sk)); +} + static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) { struct sock *sk = cq_context; struct sdp_sock *ssk = sdp_sk(sk); - if (cq != ssk->rx_ring.cq) { - sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq); + if (unlikely(cq != ssk->rx_ring.cq)) { + sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq); return; } @@ -832,65 +833,15 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) sdp_prf(sk, NULL, "rx irq"); - /* We could use rx_ring.timer instead, but mod_timer(..., 0) - * measured to add 4ms delay. - */ - tasklet_hi_schedule(&ssk->rx_ring.tasklet); -} - -static inline int sdp_should_rearm(struct sock *sk) -{ - return sk->sk_state != TCP_ESTABLISHED || - sdp_sk(sk)->tx_sa || - (sk->sk_socket && test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags)); -} - -int sdp_process_rx(struct sdp_sock *ssk) -{ - struct sock *sk = &ssk->isk.sk; - int wc_processed; - int credits_before; - - if (!rx_ring_trylock(&ssk->rx_ring)) { - sdp_dbg(&ssk->isk.sk, "ring destroyed. not polling it\n"); - return 0; - } - - credits_before = tx_credits(ssk); - - wc_processed = sdp_poll_rx_cq(ssk); - - if (wc_processed) { - sdp_prf(sk, NULL, "processed %d", wc_processed); - sdp_prf(sk, NULL, "credits: %d -> %d", - credits_before, tx_credits(ssk)); - - if (posts_handler(ssk) || (sk->sk_socket && - test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags))) { - - sdp_prf(&ssk->isk.sk, NULL, - "Somebody is doing the post work for me. %d", - posts_handler(ssk)); - - } else { - sdp_prf(&ssk->isk.sk, NULL, "Queuing work. ctl_q: %d", - !skb_queue_empty(&ssk->rx_ctl_q)); - queue_work(rx_comp_wq, &ssk->rx_comp_work); - } + if (should_wake_up(sk)) { + wake_up_interruptible(sk->sk_sleep); + SDPSTATS_COUNTER_INC(rx_int_wake_up); + } else { + if (queue_work_on(ssk->cpu, rx_comp_wq, &ssk->rx_comp_work)) + SDPSTATS_COUNTER_INC(rx_int_queue); + else + SDPSTATS_COUNTER_INC(rx_int_no_op); } - - if (unlikely(sdp_should_rearm(sk) || !posts_handler(ssk))) - sdp_arm_rx_cq(sk); - - rx_ring_unlock(&ssk->rx_ring); - - return wc_processed; -} - -static void sdp_process_rx_timer(unsigned long data) -{ - struct sdp_sock *ssk = (struct sdp_sock *)data; - sdp_process_rx(ssk); } static void sdp_rx_ring_purge(struct sdp_sock *ssk) @@ -924,15 +875,16 @@ static void sdp_rx_ring_purge(struct sdp_sock *ssk) } } -void sdp_rx_ring_init(struct sdp_sock *ssk) +static void sdp_rx_cq_event_handler(struct ib_event *event, void *data) { - ssk->rx_ring.buffer = NULL; - ssk->rx_ring.destroyed = 0; - rwlock_init(&ssk->rx_ring.destroyed_lock); } -static void sdp_rx_cq_event_handler(struct ib_event *event, void *data) +static void sdp_arm_cq_timer(unsigned long data) { + struct sdp_sock *ssk = (struct sdp_sock *)data; + + SDPSTATS_COUNTER_INC(rx_cq_arm_timer); + sdp_arm_rx_cq(&ssk->isk.sk); } int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) @@ -966,13 +918,9 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq; - spin_lock_init(&ssk->rx_ring.lock); - INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work); - ssk->rx_ring.timer.function = sdp_process_rx_timer; - ssk->rx_ring.timer.data = (unsigned long) ssk; - tasklet_init(&ssk->rx_ring.tasklet, sdp_process_rx_timer, - (unsigned long) ssk); + setup_timer(&ssk->rx_ring.cq_arm_timer, sdp_arm_cq_timer, + (unsigned long)ssk); sdp_arm_rx_cq(&ssk->isk.sk); return 0; @@ -985,7 +933,7 @@ err_cq: void sdp_rx_ring_destroy(struct sdp_sock *ssk) { - rx_ring_destroy_lock(&ssk->rx_ring); + del_timer_sync(&ssk->rx_ring.cq_arm_timer); if (ssk->rx_ring.buffer) { sdp_rx_ring_purge(ssk); @@ -1003,11 +951,5 @@ void sdp_rx_ring_destroy(struct sdp_sock *ssk) } } - /* the tasklet should be killed only after the rx_cq is destroyed, - * so there won't be rx_irq any more, meaning the tasklet will never be - * enabled. */ - del_timer_sync(&ssk->rx_ring.timer); - tasklet_kill(&ssk->rx_ring.tasklet); - SDP_WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring)); } diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index 5b4568ef207d..6e269ce40062 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -370,7 +370,8 @@ static void sdp_poll_tx_timeout(unsigned long data) if (sock_owned_by_user(sk)) { sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy"); - if (sdp_tx_handler_select(ssk) && sk->sk_state != TCP_CLOSE) { + if (sdp_tx_handler_select(ssk) && sk->sk_state != TCP_CLOSE && + likely(ssk->qp_active)) { sdp_prf1(sk, NULL, "schedule a timer"); mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); } @@ -379,8 +380,10 @@ static void sdp_poll_tx_timeout(unsigned long data) goto out; } - if (unlikely(sk->sk_state == TCP_CLOSE)) + if (unlikely(!ssk->qp || sk->sk_state == TCP_CLOSE)) { + SDPSTATS_COUNTER_INC(tx_poll_no_op); goto out; + } wc_processed = sdp_process_tx_cq(ssk); if (!wc_processed) @@ -395,7 +398,7 @@ static void sdp_poll_tx_timeout(unsigned long data) /* If there are still packets in flight and the timer has not already * been scheduled by the Tx routine then schedule it here to guarantee * completion processing of these packets */ - if (inflight) + if (inflight && likely(ssk->qp_active)) mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); out: @@ -496,8 +499,8 @@ int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device) sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq; - ssk->tx_ring.timer.function = sdp_poll_tx_timeout; - ssk->tx_ring.timer.data = (unsigned long) ssk; + setup_timer(&ssk->tx_ring.timer, sdp_poll_tx_timeout, + (unsigned long)ssk); ssk->tx_ring.poll_cnt = 0; tasklet_init(&ssk->tx_ring.tasklet, sdp_poll_tx_timeout, @@ -516,6 +519,7 @@ out: void sdp_tx_ring_destroy(struct sdp_sock *ssk) { + del_timer_sync(&ssk->tx_ring.timer); if (ssk->nagle_timer.function) del_timer_sync(&ssk->nagle_timer); diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index f04193af9568..b12f78262a38 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -366,8 +366,6 @@ void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack, spin_lock_irqsave(&ssk->tx_sa_lock, flags); - BUG_ON(!ssk); - if (!ssk->tx_sa) { sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n"); goto out; @@ -631,27 +629,25 @@ static int do_sdp_sendmsg_zcopy(struct sock *sk, struct tx_srcavail_state *tx_sa rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len, &tx_sa->fmr, &tx_sa->umem); - if (rc) { + if (unlikely(rc)) { sdp_dbg_data(sk, "Error allocating fmr: %d\n", rc); goto err_alloc_fmr; } if (tx_slots_free(ssk) == 0) { rc = wait_for_sndbuf(sk, timeo); - if (rc) { + if (unlikely(rc)) { sdp_warn(sk, "Couldn't get send buffer\n"); goto err_no_tx_slots; } } rc = sdp_post_srcavail(sk, tx_sa); - if (rc) { + if (unlikely(rc)) { sdp_dbg(sk, "Error posting SrcAvail\n"); goto err_abort_send; } - sdp_arm_rx_cq(sk); - rc = sdp_wait_rdmardcompl(ssk, timeo, 0); if (unlikely(rc)) { enum tx_sa_flag f = tx_sa->abort_flags; @@ -785,16 +781,13 @@ void sdp_abort_rdma_read(struct sock *sk) struct sdp_sock *ssk = sdp_sk(sk); struct rx_srcavail_state *rx_sa; - spin_lock_irq(&ssk->rx_ring.lock); rx_sa = ssk->rx_sa; if (!rx_sa) - goto out; + return; sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem); /* kfree(rx_sa) and posting SendSM will be handled in the nornal * flows. */ -out: - spin_unlock_irq(&ssk->rx_ring.lock); }