From 9aa170e4ff59c038e70742fbb58af7d7fd7213e4 Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Sun, 23 Aug 2009 18:28:39 +0300 Subject: [PATCH] sdp: Process tx completions from sendmsg context. arm tx cq when needed Signed-off-by: Amir Vadai --- drivers/infiniband/ulp/sdp/sdp.h | 15 +--- drivers/infiniband/ulp/sdp/sdp_main.c | 103 ++++++++++++++----------- drivers/infiniband/ulp/sdp/sdp_tx.c | 71 +++++++++++++++-- drivers/infiniband/ulp/sdp/sdp_zcopy.c | 9 +-- 4 files changed, 129 insertions(+), 69 deletions(-) diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 72c7620186b5d..b7403970f6694 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -13,7 +13,7 @@ /* Interval between sucessive polls in the Tx routine when polling is used instead of interrupts (in per-core Tx rings) - should be power of 2 */ #define SDP_TX_POLL_MODER 16 -#define SDP_TX_POLL_TIMEOUT (HZ / 4) +#define SDP_TX_POLL_TIMEOUT (HZ / 20) #define SDP_NAGLE_TIMEOUT (HZ / 10) #define SDP_SRCAVAIL_CANCEL_TIMEOUT (HZ * 5) @@ -24,7 +24,7 @@ #define SDP_ROUTE_TIMEOUT 1000 #define SDP_RETRY_COUNT 5 #define SDP_KEEPALIVE_TIME (120 * 60 * HZ) -#define SDP_FIN_WAIT_TIMEOUT (60 * HZ) +#define SDP_FIN_WAIT_TIMEOUT (10 * HZ) #define SDP_TX_SIZE 0x40 #define SDP_RX_SIZE 0x40 @@ -330,6 +330,7 @@ struct sdp_sock { struct delayed_work dreq_wait_work; struct work_struct destroy_work; + int tx_compl_pending; atomic_t somebody_is_doing_posts; /* Like tcp_sock */ @@ -518,13 +519,6 @@ static inline int tx_slots_free(struct sdp_sock *ssk) return min_free - SDP_MIN_TX_CREDITS; }; -/* like sk_stream_memory_free - except measures remote credits */ -static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk, - struct bzcopy_state *bz) -{ - return tx_slots_free(ssk) > bz->busy; -} - /* utilities */ static inline char *mid2str(int mid) { @@ -732,8 +726,7 @@ void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk); void sdp_destroy_work(struct work_struct *work); void sdp_reset_sk(struct sock *sk, int rc); void sdp_reset(struct sock *sk); -int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, - struct bzcopy_state *bz); +int sdp_tx_wait_memory(struct sdp_sock *ssk, long *timeo_p, int *credits_needed); void skb_entail(struct sock *sk, struct sdp_sock *ssk, struct sk_buff *skb); /* sdp_proc.c */ diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 006baaf4f31cf..5498cc4ab1701 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -94,7 +94,7 @@ SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME, "Default idle time in seconds before keepalive probe sent."); SDP_MODPARAM_SINT(sdp_bzcopy_thresh, 65536, "Zero copy send using SEND threshold; 0=0ff."); -SDP_MODPARAM_SINT(sdp_zcopy_thresh, 128*1024, +SDP_MODPARAM_SINT(sdp_zcopy_thresh, 0, //128*1024, "Zero copy using RDMA threshold; 0=0ff."); #define SDP_RX_COAL_TIME_HIGH 128 SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000, @@ -1064,6 +1064,7 @@ int sdp_init_sock(struct sock *sk) ssk->destruct_in_process = 0; spin_lock_init(&ssk->lock); spin_lock_init(&ssk->tx_sa_lock); + ssk->tx_compl_pending = 0; atomic_set(&ssk->somebody_is_doing_posts, 0); @@ -1613,9 +1614,12 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb, return copy; } -/* like sk_stream_wait_memory - except waits on remote credits */ -int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, - struct bzcopy_state *bz) +/* like sk_stream_wait_memory - except: + * - if credits_needed provided - wait for enough credits + * - TX irq will use this (in sendmsg context) to do the actual tx + * comp poll and post + */ +int sdp_tx_wait_memory(struct sdp_sock *ssk, long *timeo_p, int *credits_needed) { struct sock *sk = &ssk->isk.sk; int err = 0; @@ -1623,9 +1627,7 @@ int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, long current_timeo = *timeo_p; DEFINE_WAIT(wait); - BUG_ON(!bz); - - if (sdp_bzcopy_slots_avail(ssk, bz)) + if (sk_stream_memory_free(sk)) current_timeo = vm_wait = (net_random() % (HZ / 5)) + 2; while (1) { @@ -1633,41 +1635,57 @@ int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE); - if (unlikely(sk->sk_err | (sk->sk_shutdown & SEND_SHUTDOWN))) { - err = -EPIPE; - break; - } + if (sk->sk_err || (sk->sk_shutdown & SEND_SHUTDOWN)) + goto do_error; + if (!*timeo_p) + goto do_nonblock; + if (signal_pending(current)) + goto do_interrupted; + clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); - if (unlikely(!*timeo_p)) { - err = -EAGAIN; - break; - } + sdp_do_posts(ssk); - if (unlikely(signal_pending(current))) { - err = sock_intr_errno(*timeo_p); - break; + if (credits_needed) { + if (tx_slots_free(ssk) >= *credits_needed) + break; + } else { + if (sk_stream_memory_free(sk) && !vm_wait) + break; } - clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); - posts_handler_put(ssk); - if (sdp_bzcopy_slots_avail(ssk, bz)) - break; - set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); sk->sk_write_pending++; + sdp_prf1(sk, NULL, "Going to sleep"); + if (tx_credits(ssk) > SDP_MIN_TX_CREDITS) sdp_arm_tx_cq(sk); - sk_wait_event(sk, ¤t_timeo, - sdp_bzcopy_slots_avail(ssk, bz) && vm_wait); + if (credits_needed) { + sk_wait_event(sk, ¤t_timeo, + !sk->sk_err && + !(sk->sk_shutdown & SEND_SHUTDOWN) && + !ssk->tx_compl_pending && + tx_slots_free(ssk) >= *credits_needed && + vm_wait); + } else { + sk_wait_event(sk, ¤t_timeo, + !sk->sk_err && + !(sk->sk_shutdown & SEND_SHUTDOWN) && + !ssk->tx_compl_pending && + sk_stream_memory_free(sk) && + vm_wait); + } + + sdp_prf1(sk, NULL, "Woke up"); sk->sk_write_pending--; - sdp_prf1(sk, NULL, "finished wait for mem"); posts_handler_get(ssk); - sdp_do_posts(ssk); + + if (!ssk->qp_active) + goto do_error; if (vm_wait) { vm_wait -= current_timeo; @@ -1679,9 +1697,19 @@ int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, } *timeo_p = current_timeo; } - +out: finish_wait(sk->sk_sleep, &wait); return err; + +do_error: + err = -EPIPE; + goto out; +do_nonblock: + err = -EAGAIN; + goto out; +do_interrupted: + err = sock_intr_errno(*timeo_p); + goto out; } /* Like tcp_sendmsg */ @@ -1772,7 +1800,7 @@ new_segment: * receive credits. */ if (bz) { - if (!sdp_bzcopy_slots_avail(ssk, bz)) + if (tx_slots_free(ssk) < bz->busy) goto wait_for_sndbuf; } else { if (!sk_stream_memory_free(sk)) @@ -1850,21 +1878,8 @@ wait_for_memory: if (copied) sdp_push(sk, ssk, flags & ~MSG_MORE); - sdp_xmit_poll(ssk, 1); - - if (bz) { - err = sdp_bzcopy_wait_memory(ssk, &timeo, bz); - } else { - posts_handler_put(ssk); - - sdp_arm_tx_cq(sk); - - err = sk_stream_wait_memory(sk, &timeo); - - posts_handler_get(ssk); - sdp_do_posts(ssk); - } - + err = sdp_tx_wait_memory(ssk, &timeo, + bz ? &bz->busy : NULL); if (err) goto do_error; diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index bf0a290c99f64..18d89ebfc31c7 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -53,6 +53,8 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force) if (!timer_pending(&ssk->tx_ring.timer)) mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + ssk->tx_compl_pending = 0; + /* Poll the CQ every SDP_TX_POLL_MODER packets */ if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) wc_processed = sdp_process_tx_cq(ssk); @@ -297,28 +299,74 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk) if (wc_processed) { struct sock *sk = &ssk->isk.sk; sdp_post_sends(ssk, 0); + sdp_prf1(sk, NULL, "Waking sendmsg. inflight=%d", + (u32) tx_ring_posted(ssk)); + sk_stream_write_space(&ssk->isk.sk); + if (sk->sk_write_pending && + test_bit(SOCK_NOSPACE, &sk->sk_socket->flags) && + tx_ring_posted(ssk)) { + /* a write is pending and still no room in tx queue, + * arm tx cq + */ + sdp_prf(&ssk->isk.sk, NULL, "pending tx - rearming"); + sdp_arm_tx_cq(sk); + } - if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) - sk_stream_write_space(&ssk->isk.sk); } return wc_processed; } +/* Select who will handle tx completion: + * - a write is pending - wake it up and let it do the poll + post + * - post handler is taken - taker will do the poll + post + * else return 1 and let the caller do it + */ +static int sdp_tx_handler_select(struct sdp_sock *ssk) +{ + struct sock *sk = &ssk->isk.sk; + + if (sk->sk_write_pending) { + /* Do the TX posts from sender context */ + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) { + sdp_prf1(sk, NULL, "Waking up pending sendmsg"); + wake_up_interruptible(sk->sk_sleep); + return 0; + } else + sdp_prf1(sk, NULL, "Unexpected: sk_sleep=%p, " + "waitqueue_active: %d", + sk->sk_sleep, waitqueue_active(sk->sk_sleep)); + } + + if (posts_handler(ssk)) { + /* Somebody else available to check for completion */ + sdp_prf1(sk, NULL, "Somebody else will call do_posts"); + return 0; + } + + return 1; +} + static void sdp_poll_tx_timeout(unsigned long data) { struct sdp_sock *ssk = (struct sdp_sock *)data; struct sock *sk = &ssk->isk.sk; u32 inflight, wc_processed; - sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d", - (u32) tx_ring_posted(ssk)); + sdp_prf1(&ssk->isk.sk, NULL, "TX timeout: inflight=%d, head=%d tail=%d", + (u32) tx_ring_posted(ssk), + ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring)); /* Only process if the socket is not in use */ bh_lock_sock(sk); if (sock_owned_by_user(sk)) { - mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); - sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy\n"); + sdp_prf(&ssk->isk.sk, NULL, "TX comp: socket is busy"); + + if (sdp_tx_handler_select(ssk)) { + sdp_prf1(sk, NULL, "schedule a timer"); + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + } + SDPSTATS_COUNTER_INC(tx_poll_busy); goto out; } @@ -333,6 +381,8 @@ static void sdp_poll_tx_timeout(unsigned long data) SDPSTATS_COUNTER_INC(tx_poll_hit); inflight = (u32) rx_ring_posted(ssk); + sdp_prf1(&ssk->isk.sk, NULL, "finished tx proccessing. inflight = %d", + tx_ring_posted(ssk)); /* If there are still packets in flight and the timer has not already * been scheduled by the Tx routine then schedule it here to guarantee @@ -360,8 +410,13 @@ static void sdp_tx_irq(struct ib_cq *cq, void *cq_context) SDPSTATS_COUNTER_INC(tx_int_count); - mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); - tasklet_schedule(&ssk->tx_ring.tasklet); + ssk->tx_compl_pending = 1; + + if (sdp_tx_handler_select(ssk)) { + sdp_prf1(sk, NULL, "poll and post from tasklet"); + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + tasklet_schedule(&ssk->tx_ring.tasklet); + } } void sdp_tx_ring_purge(struct sdp_sock *ssk) diff --git a/drivers/infiniband/ulp/sdp/sdp_zcopy.c b/drivers/infiniband/ulp/sdp/sdp_zcopy.c index 3ce1d79581ef8..c8b91ea2d8335 100644 --- a/drivers/infiniband/ulp/sdp/sdp_zcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_zcopy.c @@ -46,10 +46,6 @@ #include #include "sdp.h" -static struct bzcopy_state dummy_bz = { -busy: 1, -}; - static int sdp_post_srcavail(struct sock *sk, struct tx_srcavail_state *tx_sa, int page_idx, int off, size_t len) { @@ -773,6 +769,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) { struct sdp_sock *ssk = sdp_sk(sk); int ret = 0; + int credits_needed = 1; sdp_dbg_data(sk, "Wait for mem\n"); @@ -784,7 +781,7 @@ static inline int wait_for_sndbuf(struct sock *sk, long *timeo_p) sdp_xmit_poll(ssk, 1); - ret = sdp_bzcopy_wait_memory(ssk, timeo_p, &dummy_bz); + ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed); return ret; } @@ -801,7 +798,7 @@ static int sdp_rdma_adv_single(struct sock *sk, sdp_dbg_data(sk, "off: 0x%x len: 0x%x page_cnt: 0x%x\n", offset, len, page_cnt); - if (!sdp_bzcopy_slots_avail(ssk, &dummy_bz)) { + if (tx_slots_free(ssk) == 0) { rc = wait_for_sndbuf(sk, &timeo); if (rc) { sdp_warn(sk, "Couldn't get send buffer\n"); -- 2.49.0