From 8d179bb3e613cacdc7b69ab6857b50a7d72ff3cd Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Tue, 26 May 2009 19:16:56 +0300 Subject: [PATCH] sdp: TX from 1 context only. RX with minimal context switches Signed-off-by: Amir Vadai --- drivers/infiniband/ulp/sdp/sdp.h | 43 ++-- drivers/infiniband/ulp/sdp/sdp_bcopy.c | 3 +- drivers/infiniband/ulp/sdp/sdp_cma.c | 4 +- drivers/infiniband/ulp/sdp/sdp_main.c | 52 ++-- drivers/infiniband/ulp/sdp/sdp_proc.c | 22 +- drivers/infiniband/ulp/sdp/sdp_rx.c | 334 +++++++++++++------------ drivers/infiniband/ulp/sdp/sdp_tx.c | 6 +- 7 files changed, 234 insertions(+), 230 deletions(-) diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index f9b295ee98f3..25e18442b2de 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -6,15 +6,22 @@ #include #include /* For urgent data flags */ #include +#include + +#undef SDPSTATS_ON +#undef SDP_PROFILING +#undef CONFIG_INFINIBAND_SDP_DEBUG_DATA +#undef CONFIG_INFINIBAND_SDP_DEBUG -#undef SDP_LOCKS_CHECK #define SDPSTATS_ON #define SDP_PROFILING +#define CONFIG_INFINIBAND_SDP_DEBUG_DATA +#define CONFIG_INFINIBAND_SDP_DEBUG #define _sdp_printk(func, line, level, sk, format, arg...) \ - printk(level "%s:%d sdp_sock(%5d %d:%d): " format, \ + printk(level "%s:%d sdp_sock(%5d:%d %d:%d): " format, \ func, line, \ - current->pid, \ + current->pid, smp_processor_id(), \ (sk) ? inet_sk(sk)->num : -1, \ (sk) ? ntohs(inet_sk(sk)->dport) : -1, ## arg) #define sdp_printk(level, sk, format, arg...) \ @@ -23,25 +30,6 @@ sdp_printk(KERN_WARNING, sk, format , ## arg) -#ifdef SDP_LOCKS_CHECK -#define WARN_ON_UNLOCKED(sk, l) do {\ - if (unlikely(!spin_is_locked(l))) { \ - sdp_warn(sk, "lock " #l " should be locked\n"); \ - WARN_ON(1); \ - } \ -} while (0) - -#define WARN_ON_LOCKED(sk, l) do {\ - if (unlikely(spin_is_locked(l))) { \ - sdp_warn(sk, "lock " #l " should be unlocked\n"); \ - WARN_ON(1); \ - } \ -} while (0) -#else -#define WARN_ON_UNLOCKED(sk, l) -#define WARN_ON_LOCKED(sk, l) -#endif - #define rx_ring_lock(ssk, f) do { \ spin_lock_irqsave(&ssk->rx_ring.lock, f); \ } while (0) @@ -55,6 +43,7 @@ struct sk_buff; struct sdpprf_log { int idx; int pid; + int cpu; int sk_num; int sk_dport; struct sk_buff *skb; @@ -83,6 +72,7 @@ static inline unsigned long long current_nsec(void) l->pid = current->pid; \ l->sk_num = (sk) ? inet_sk(sk)->num : -1; \ l->sk_dport = (sk) ? ntohs(inet_sk(sk)->dport) : -1; \ + l->cpu = smp_processor_id(); \ l->skb = s; \ snprintf(l->msg, sizeof(l->msg) - 1, format, ## arg); \ l->time = current_nsec(); \ @@ -360,7 +350,7 @@ struct sdp_sock { struct list_head sock_list; struct list_head accept_queue; struct list_head backlog_queue; - struct sk_buff_head rx_backlog; + struct sk_buff_head rx_ctl_q; struct sock *parent; struct work_struct rx_comp_work; @@ -373,7 +363,8 @@ struct sdp_sock { u16 urg_data; u32 urg_seq; u32 copied_seq; - u32 rcv_nxt; +#define rcv_nxt(ssk) atomic_read(&(ssk->rcv_nxt)) + atomic_t rcv_nxt; int write_seq; int pushed_seq; @@ -561,10 +552,10 @@ void sdp_nagle_timeout(unsigned long data); void sdp_rx_ring_init(struct sdp_sock *ssk); int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device); void sdp_rx_ring_destroy(struct sdp_sock *ssk); -int sdp_process_rx_q(struct sdp_sock *ssk); int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); -void sdp_post_recvs(struct sdp_sock *ssk); +void sdp_schedule_post_recvs(struct sdp_sock *ssk); +void sdp_rx_comp_full(struct sdp_sock *ssk); static inline void sdp_arm_rx_cq(struct sock *sk) { diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 6e0a038d0852..8f6f0e29605c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -157,7 +157,8 @@ void sdp_nagle_timeout(unsigned long data) out: bh_unlock_sock(sk); out2: - mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT); + if (sk->sk_send_head) /* If has pending sends - rearm */ + mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT); } int sdp_post_credits(struct sdp_sock *ssk) diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index a9dcf777509c..5726fe45a96e 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -288,15 +288,13 @@ static int sdp_disconnected_handler(struct sock *sk) sdp_dbg(sk, "%s\n", __func__); - sdp_process_rx_q(ssk); - if (ssk->tx_ring.cq) sdp_xmit_poll(ssk, 1); if (sk->sk_state == TCP_SYN_RECV) { sdp_connected_handler(sk, NULL); - if (ssk->rcv_nxt) + if (rcv_nxt(ssk)) return 0; } diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index c6b17dbcc6d8..432b198939bf 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -325,8 +325,6 @@ void sdp_reset_sk(struct sock *sk, int rc) read_lock(&device_removal_lock); - sdp_process_rx_q(ssk); - if (ssk->tx_ring.cq) sdp_xmit_poll(ssk, 1); @@ -505,7 +503,7 @@ static void sdp_close(struct sock *sk, long timeout) * descriptor close, not protocol-sourced closes, because the * reader process may not have drained the data yet! */ - while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) { + while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) { data_was_unread = 1; __kfree_skb(skb); } @@ -696,13 +694,9 @@ static int sdp_wait_for_connect(struct sock *sk, long timeo) TASK_INTERRUPTIBLE); release_sock(sk); if (list_empty(&ssk->accept_queue)) { - sdp_dbg(sk, "%s schedule_timeout\n", __func__); timeo = schedule_timeout(timeo); - sdp_dbg(sk, "%s schedule_timeout done\n", __func__); } - sdp_dbg(sk, "%s lock_sock\n", __func__); lock_sock(sk); - sdp_dbg(sk, "%s lock_sock done\n", __func__); err = 0; if (!list_empty(&ssk->accept_queue)) break; @@ -800,8 +794,8 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg) else if (sock_flag(sk, SOCK_URGINLINE) || !ssk->urg_data || before(ssk->urg_seq, ssk->copied_seq) || - !before(ssk->urg_seq, ssk->rcv_nxt)) { - answ = ssk->rcv_nxt - ssk->copied_seq; + !before(ssk->urg_seq, rcv_nxt(ssk))) { + answ = rcv_nxt(ssk) - ssk->copied_seq; /* Subtract 1, if FIN is in queue. */ if (answ && !skb_queue_empty(&sk->sk_receive_queue)) @@ -925,7 +919,7 @@ int sdp_init_sock(struct sock *sk) sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM; - skb_queue_head_init(&ssk->rx_backlog); + skb_queue_head_init(&ssk->rx_ctl_q); atomic_set(&ssk->mseq_ack, 0); @@ -1138,7 +1132,7 @@ static inline int poll_recv_cq(struct sock *sk) { int i; for (i = 0; i < recv_poll; ++i) { - if (!sdp_process_rx_q(sdp_sk(sk))) { + if (!skb_queue_empty(&sk->sk_receive_queue)) { ++recv_poll_hit; return 0; } @@ -1214,12 +1208,6 @@ static int sdp_recv_urg(struct sock *sk, long timeo, return -EAGAIN; } -static void sdp_rcv_space_adjust(struct sock *sk) -{ - sdp_post_recvs(sdp_sk(sk)); - sdp_post_sends(sdp_sk(sk), 0); -} - static unsigned int sdp_current_mss(struct sock *sk, int large_allowed) { /* TODO */ @@ -1875,7 +1863,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, lock_sock(sk); sdp_dbg_data(sk, "%s\n", __func__); -// sdp_prf(sk, skb, "Read from user"); + sdp_prf(sk, skb, "Read from user"); err = -ENOTCONN; if (sk->sk_state == TCP_LISTEN) @@ -1912,12 +1900,12 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, if (!skb) break; - if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN) - goto found_fin_ok; + BUG_ON((skb_transport_header(skb))[0] != SDP_MID_DATA); if (before(*seq, TCP_SKB_CB(skb)->seq)) { - printk(KERN_INFO "recvmsg bug: copied %X " - "seq %X\n", *seq, TCP_SKB_CB(skb)->seq); + sdp_warn(sk, "recvmsg bug: copied %X seq %X\n", + *seq, TCP_SKB_CB(skb)->seq); + sdp_reset(sk); break; } @@ -2034,7 +2022,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, sdp_dbg_data(sk, "%s: done copied %d target %d\n", __func__, copied, target); - sdp_rcv_space_adjust(sk); + sdp_schedule_post_recvs(sdp_sk(sk)); skip_copy: if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq)) ssk->urg_data = 0; @@ -2042,21 +2030,13 @@ skip_copy: continue; offset = 0; - if (!(flags & MSG_PEEK)) - sk_eat_skb(sk, skb, 0); - - continue; -found_fin_ok: - ++*seq; - if (!(flags & MSG_PEEK)) - sk_eat_skb(sk, skb, 0); - - break; + if (!(flags & MSG_PEEK)) { + skb_unlink(skb, &sk->sk_receive_queue); + __kfree_skb(skb); + } } while (len > 0); - release_sock(sk); - return copied; - + err = copied; out: release_sock(sk); return err; diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c index ec9785499f18..a4a1a4d5a7dd 100644 --- a/drivers/infiniband/ulp/sdp/sdp_proc.c +++ b/drivers/infiniband/ulp/sdp/sdp_proc.c @@ -39,9 +39,6 @@ #define PROC_SDP_STATS "sdpstats" #define PROC_SDP_PERF "sdpprf" -struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE]; -int sdpprf_log_count = 0; - /* just like TCP fs */ struct sdp_seq_afinfo { struct module *owner; @@ -57,9 +54,6 @@ struct sdp_iter_state { struct seq_operations seq_ops; }; -//struct sdpprf sdpprf = { { 0 } }; - - static void *sdp_get_idx(struct seq_file *seq, loff_t pos) { int i = 0; @@ -148,7 +142,7 @@ static int sdp_seq_show(struct seq_file *seq, void *v) srcp = ntohs(inet_sk(sk)->sport); uid = sock_i_uid(sk); inode = sock_i_ino(sk); - rx_queue = sdp_sk(sk)->rcv_nxt - sdp_sk(sk)->copied_seq; + rx_queue = rcv_nxt(sdp_sk(sk)) - sdp_sk(sk)->copied_seq; tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->tx_ring.una_seq; sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu %08X:%08X %X", @@ -273,6 +267,7 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v) seq_printf(seq, "rx_poll_miss \t\t: %d\n", sdpstats.rx_poll_miss); seq_printf(seq, "tx_poll_miss \t\t: %d\n", sdpstats.tx_poll_miss); + seq_printf(seq, "tx_poll_busy \t\t: %d\n", sdpstats.tx_poll_busy); seq_printf(seq, "tx_poll_hit \t\t: %d\n", sdpstats.tx_poll_hit); seq_printf(seq, "CQ stats:\n"); @@ -306,6 +301,10 @@ static struct file_operations sdpstats_fops = { #endif +#ifdef SDP_PROFILING +struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE]; +int sdpprf_log_count = 0; + unsigned long long start_t = 0; static int sdpprf_show(struct seq_file *m, void *v) @@ -321,9 +320,9 @@ static int sdpprf_show(struct seq_file *m, void *v) t = l->time - start_t; nsec_rem = do_div(t, 1000000000); - seq_printf(m, "%-6d: [%5lu.%06lu] %-50s - [%d %d:%d] skb: %p %s:%d\n", + seq_printf(m, "%-6d: [%5lu.%06lu] %-50s - [%d{%d} %d:%d] skb: %p %s:%d\n", l->idx, (unsigned long)t, nsec_rem/1000, - l->msg, l->pid, l->sk_num, l->sk_dport, + l->msg, l->pid, l->cpu, l->sk_num, l->sk_dport, l->skb, l->func, l->line); out: return 0; @@ -408,6 +407,7 @@ static struct file_operations sdpprf_fops = { .release = seq_release, .write = sdpprf_write, }; +#endif /* SDP_PROFILING */ int __init sdp_proc_init(void) { @@ -437,10 +437,12 @@ int __init sdp_proc_init(void) #endif +#ifdef SDP_PROFILING sdpprf = proc_net_fops_create(&init_net, PROC_SDP_PERF, S_IRUGO | S_IWUGO, &sdpprf_fops); if (!sdpprf) goto no_mem; +#endif return 0; no_mem: @@ -464,7 +466,9 @@ void sdp_proc_unregister(void) #ifdef SDPSTATS_ON proc_net_remove(&init_net, PROC_SDP_STATS); #endif +#ifdef SDP_PROFILING proc_net_remove(&init_net, PROC_SDP_PERF); +#endif } #else /* CONFIG_PROC_FS */ diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 05d86901a9e5..7d5b33edc703 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -149,9 +149,6 @@ static void sdp_fin(struct sock *sk) } } -/* lock_sock must be taken before calling this - since rx_ring.head is not - * protected (although being atomic - */ static int sdp_post_recv(struct sdp_sock *ssk) { struct sdp_buf *rx_req; @@ -170,7 +167,6 @@ static int sdp_post_recv(struct sdp_sock *ssk) gfp_t gfp_page; int ret = 0; - WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); /* Now, allocate and repost recv */ /* TODO: allocate from cache */ @@ -184,6 +180,7 @@ static int sdp_post_recv(struct sdp_sock *ssk) gfp_page = GFP_HIGHUSER; } + sdp_prf(&ssk->isk.sk, skb, "Posting skb"); /* FIXME */ BUG_ON(!skb); h = (struct sdp_bsdh *)skb->head; @@ -231,53 +228,90 @@ static int sdp_post_recv(struct sdp_sock *ssk) rx_wr.sg_list = ibsge; rx_wr.num_sge = frags + 1; rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr); - SDPSTATS_COUNTER_INC(post_recv); atomic_inc(&ssk->rx_ring.head); if (unlikely(rc)) { sdp_warn(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc); + + lock_sock(&ssk->isk.sk); sdp_reset(&ssk->isk.sk); + release_sock(&ssk->isk.sk); + ret = -1; } + SDPSTATS_COUNTER_INC(post_recv); atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage); return ret; } -/* lock_sock must be taken before calling this */ -static void _sdp_post_recvs(struct sdp_sock *ssk) +static inline int sdp_post_recvs_needed(struct sdp_sock *ssk) { struct sock *sk = &ssk->isk.sk; int scale = ssk->rcvbuf_scale; - - WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - - if (unlikely(!ssk->id || ((1 << sk->sk_state) & - (TCPF_CLOSE | TCPF_TIME_WAIT)))) { - return; - } + int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE; + unsigned long max_bytes; if (top_mem_usage && (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) scale = 1; - while ((likely(ring_posted(ssk->rx_ring) < SDP_RX_SIZE) && - (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) * - (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) + - ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) || - unlikely(ring_posted(ssk->rx_ring) < SDP_MIN_TX_CREDITS)) { - if (sdp_post_recv(ssk)) - break; + max_bytes = sk->sk_rcvbuf * scale; + + if (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE)) { + sdp_prf(sk, NULL, "rx ring is full"); + return 0; } + + if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) { + unsigned long bytes_in_process = + (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) * buffer_size; + bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq; + + if (bytes_in_process >= max_bytes) { + sdp_prf(sk, NULL, "bytes_in_process:%ld > max_bytes:%ld", + bytes_in_process, max_bytes); + return 0; + } + } + + return 1; } -void sdp_post_recvs(struct sdp_sock *ssk) +static inline void sdp_post_recvs(struct sdp_sock *ssk) { - unsigned long flags; + int rc = 0; +again: + ssk->posts_in_process = 1; - rx_ring_lock(ssk, flags); - _sdp_post_recvs(ssk); - rx_ring_unlock(ssk, flags); + do { + if (!sdp_post_recvs_needed(ssk)) + break; + + rc = sdp_post_recv(ssk); + } while (!rc); + + sk_stream_mem_reclaim(&ssk->isk.sk); + + ssk->posts_in_process = 0; + if (sdp_post_recvs_needed(ssk)) + goto again; +} + +void sdp_schedule_post_recvs(struct sdp_sock *ssk) +{ + struct sock *sk = &ssk->isk.sk; + + WARN_ON_LOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); + + if (unlikely(!ssk->id || ((1 << sk->sk_state) & + (TCPF_CLOSE | TCPF_TIME_WAIT)))) { + sdp_prf(sk, NULL, "socket is closed - not posting"); + return; + } + + if (!ssk->posts_in_process && sdp_post_recvs_needed(ssk)) + queue_work(rx_comp_wq, &ssk->rx_comp_work); } static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, @@ -285,7 +319,6 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, { int skb_len; struct sdp_sock *ssk = sdp_sk(sk); - struct sk_buff *tail = NULL; /* not needed since sk_rmem_alloc is not currently used * TODO - remove this? @@ -293,16 +326,10 @@ static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, skb_len = skb->len; - TCP_SKB_CB(skb)->seq = ssk->rcv_nxt; - ssk->rcv_nxt += skb_len; + TCP_SKB_CB(skb)->seq = rcv_nxt(ssk); + atomic_add(skb_len, &ssk->rcv_nxt); - if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) && - unlikely(skb_tailroom(tail) >= skb_len)) { - skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len); - __kfree_skb(skb); - skb = tail; - } else - skb_queue_tail(&sk->sk_receive_queue, skb); + skb_queue_tail(&sk->sk_receive_queue, skb); if (!sock_flag(sk, SOCK_DEAD)) sk->sk_data_ready(sk, skb_len); @@ -412,71 +439,97 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) return skb; } -/* this must be called while sock_lock is taken */ -static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) +/* socket lock should be taken before calling this */ +static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) { + struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data; struct sock *sk = &ssk->isk.sk; - int frags; - struct sdp_bsdh *h; - int pagesz, i; - - h = (struct sdp_bsdh *)skb->data; - - frags = skb_shinfo(skb)->nr_frags; - pagesz = PAGE_ALIGN(skb->data_len); - skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE; - - for (i = skb_shinfo(skb)->nr_frags; - i < frags; ++i) { - put_page(skb_shinfo(skb)->frags[i].page); - skb->truesize -= PAGE_SIZE; - } - - if (unlikely(h->flags & SDP_OOB_PEND)) - sk_send_sigurg(sk); - - skb_pull(skb, sizeof(struct sdp_bsdh)); switch (h->mid) { case SDP_MID_DATA: - if (unlikely(skb->len <= 0)) { - __kfree_skb(skb); - break; - } + WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN)); - if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { - /* got data in RCV_SHUTDOWN */ - if (sk->sk_state == TCP_FIN_WAIT1) { - /* go into abortive close */ - sdp_exch_state(sk, TCPF_FIN_WAIT1, - TCP_TIME_WAIT); + sdp_warn(sk, "DATA after socket rcv was shutdown\n"); - sk->sk_prot->disconnect(sk, 0); - } + /* got data in RCV_SHUTDOWN */ + if (sk->sk_state == TCP_FIN_WAIT1) { + sdp_warn(sk, "socket in shutdown, state = FIN_WAIT1 and got data\n"); + /* go into abortive close */ + sdp_exch_state(sk, TCPF_FIN_WAIT1, + TCP_TIME_WAIT); - __kfree_skb(skb); - break; + sk->sk_prot->disconnect(sk, 0); } - skb = sdp_sock_queue_rcv_skb(sk, skb); - if (unlikely(h->flags & SDP_OOB_PRES)) - sdp_urg(ssk, skb); + break; case SDP_MID_DISCONN: - __kfree_skb(skb); + sdp_dbg_data(sk, "Handling RX disconnect\n"); sdp_fin(sk); break; case SDP_MID_CHRCVBUF: + sdp_dbg_data(sk, "Handling RX CHRCVBUF\n"); sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)skb->data); - __kfree_skb(skb); break; case SDP_MID_CHRCVBUF_ACK: + sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n"); sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data); - __kfree_skb(skb); break; default: /* TODO: Handle other messages */ - printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid); + sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid); + } + + __kfree_skb(skb); + + return 0; +} + +static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) +{ + struct sock *sk = &ssk->isk.sk; + int frags; + struct sdp_bsdh *h; + int pagesz, i; + unsigned long mseq_ack; + int credits_before; + + h = (struct sdp_bsdh *)skb->data; + + SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); + + mseq_ack = ntohl(h->mseq_ack); + credits_before = tx_credits(ssk); + atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 1 + + ntohs(h->bufs)); + if (mseq_ack >= ssk->nagle_last_unacked) + ssk->nagle_last_unacked = 0; + + sdp_prf(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d", + mid2str(h->mid), ntohs(h->bufs), credits_before, + tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); + + frags = skb_shinfo(skb)->nr_frags; + pagesz = PAGE_ALIGN(skb->data_len); + skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE; + + for (i = skb_shinfo(skb)->nr_frags; i < frags; ++i) { + put_page(skb_shinfo(skb)->frags[i].page); + skb->truesize -= PAGE_SIZE; + } + +/* if (unlikely(h->flags & SDP_OOB_PEND)) + sk_send_sigurg(sk);*/ + + if (h->mid != SDP_MID_DATA || unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { + sdp_warn(sk, "Control skb - queing to control queue\n"); + skb_queue_tail(&ssk->rx_ctl_q, skb); + return 0; + } + + skb_pull(skb, sizeof(struct sdp_bsdh)); + + if (unlikely(skb->len <= 0)) { __kfree_skb(skb); } @@ -489,8 +542,7 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) struct sk_buff *skb; struct sdp_bsdh *h; struct sock *sk = &ssk->isk.sk; - int credits_before; - unsigned long mseq_ack; + int mseq; skb = sdp_recv_completion(ssk, wc->wr_id); if (unlikely(!skb)) @@ -530,23 +582,12 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) h = (struct sdp_bsdh *)skb->data; SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h); skb_reset_transport_header(skb); - atomic_set(&ssk->mseq_ack, ntohl(h->mseq)); - if (mseq_ack(ssk) != (int)wc->wr_id) - printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n", - mseq_ack(ssk), (int)wc->wr_id); - SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); - - mseq_ack = ntohl(h->mseq_ack); - credits_before = tx_credits(ssk); - atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) + 1 + - ntohs(h->bufs)); - if (mseq_ack >= ssk->nagle_last_unacked) - ssk->nagle_last_unacked = 0; - - sdp_prf(&ssk->isk.sk, skb, "RX %s bufs=%d c before:%d after:%d " - "mseq:%d, ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before, - tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); + mseq = ntohl(h->mseq); + atomic_set(&ssk->mseq_ack, mseq); + if (mseq != (int)wc->wr_id) + sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n", + mseq, (int)wc->wr_id); return skb; } @@ -576,7 +617,6 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk) } /* only from interrupt. - * drain rx cq into rx_backlog queue */ static int sdp_poll_rx_cq(struct sdp_sock *ssk) { struct ib_cq *cq = ssk->rx_ring.cq; @@ -596,7 +636,8 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk) skb = sdp_process_rx_wc(ssk, wc); if (!skb) continue; - skb_queue_tail(&ssk->rx_backlog, skb); + + sdp_process_rx_skb(ssk, skb); wc_processed++; } } while (n == SDP_NUM_WC); @@ -607,69 +648,55 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk) return wc_processed; } -int sdp_process_rx_q(struct sdp_sock *ssk) +void sdp_rx_comp_work(struct work_struct *work) { - struct sk_buff *skb; + struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); struct sock *sk = &ssk->isk.sk; - unsigned long flags; + int xmit_poll_force; + struct sk_buff *skb; - if (!ssk->rx_backlog.next || !ssk->rx_backlog.prev) { - sdp_warn(&ssk->isk.sk, "polling a zeroed rx_backlog!!!! %p\n", &ssk->rx_backlog); - return 0; - } + sdp_prf(sk, NULL, "%s", __func__); + + if (unlikely(!ssk->rx_ring.cq)) + return; - if (skb_queue_empty(&ssk->rx_backlog)) { - SDPSTATS_COUNTER_INC(rx_poll_miss); - return -EAGAIN; + if (unlikely(!ssk->poll_cq)) { + struct rdma_cm_id *id = ssk->id; + sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n"); + if (id && id->qp) + rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED); + return; } - /* update credits */ - sdp_post_sends(ssk, 0); + sdp_post_recvs(ssk); - spin_lock_irqsave(&ssk->rx_backlog.lock, flags); - while ((skb = __skb_dequeue(&ssk->rx_backlog))) { - sdp_process_rx_skb(ssk, skb); + if (!(sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS)) && + !credit_update_needed(ssk) && + skb_queue_empty(&ssk->rx_ctl_q)) { + sdp_prf(&ssk->isk.sk, NULL, "only post recvs"); + return; } - spin_unlock_irqrestore(&ssk->rx_backlog.lock, flags); - if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) - sk_stream_write_space(&ssk->isk.sk); + lock_sock(sk); - return 0; -} + sdp_post_sends(ssk, 0); -static void sdp_rx_comp_work(struct work_struct *work) -{ - struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); - struct sock *sk = &ssk->isk.sk; - struct ib_cq *rx_cq; + while ((skb = skb_dequeue(&ssk->rx_ctl_q))) { + sdp_process_rx_ctl_skb(ssk, skb); + } - lock_sock(sk); - rx_cq = ssk->rx_ring.cq; - if (unlikely(!rx_cq)) - goto out; + sk_stream_mem_reclaim(sk); - if (unlikely(!ssk->poll_cq)) { - struct rdma_cm_id *id = ssk->id; - sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n"); - if (id && id->qp) - rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED); - goto out; - } + xmit_poll_force = sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS); - sdp_process_rx_q(ssk); - sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */ - release_sock(sk); - sk_mem_reclaim(sk); - lock_sock(sk); - rx_cq = ssk->rx_ring.cq; - if (unlikely(!rx_cq)) - goto out; - - sdp_process_rx_q(ssk); - sdp_xmit_poll(ssk, 1); + if (credit_update_needed(ssk) || xmit_poll_force) { + /* if has pending tx because run out of tx_credits - xmit it */ + sdp_prf(sk, NULL, "Processing to free pending sends"); + sdp_xmit_poll(ssk, xmit_poll_force); + sdp_prf(sk, NULL, "Sending credit update if needed"); + sdp_post_sends(ssk, 0); + } -out: release_sock(sk); } @@ -679,6 +706,7 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) struct sdp_sock *ssk = sdp_sk(sk); unsigned long flags; int wc_processed = 0; + int credits_before; sdp_dbg_data(&ssk->isk.sk, "rx irq called\n"); @@ -690,6 +718,8 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) rx_ring_lock(ssk, flags); + credits_before = tx_credits(ssk); + if (unlikely(!ssk->poll_cq)) sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n"); @@ -703,15 +733,11 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed); if (wc_processed) { - _sdp_post_recvs(ssk); + sdp_prf(&ssk->isk.sk, NULL, "credits: %d -> %d", + credits_before, tx_credits(ssk)); - /* Best was to send credit update from here */ -/* sdp_post_credits(ssk); */ - - /* issue sdp_rx_comp_work() */ queue_work(rx_comp_wq, &ssk->rx_comp_work); } - sdp_arm_rx_cq(sk); out: @@ -764,7 +790,7 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) } rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler, - &ssk->isk.sk, SDP_RX_SIZE, 0); + &ssk->isk.sk, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED); if (IS_ERR(rx_cq)) { rc = PTR_ERR(rx_cq); diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index a7d16feb556c..95e3c82320cc 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -287,15 +287,19 @@ static void sdp_poll_tx_timeout(unsigned long data) sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", (u32) ring_posted(ssk->tx_ring)); + sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__, + (u32) ring_posted(ssk->tx_ring)); + /* Only process if the socket is not in use */ bh_lock_sock(sk); if (sock_owned_by_user(sk)) { mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n"); + SDPSTATS_COUNTER_INC(tx_poll_busy); goto out; } - if (sk->sk_state == TCP_CLOSE) + if (unlikely(sk->sk_state == TCP_CLOSE)) goto out; wc_processed = sdp_process_tx_cq(ssk); -- 2.50.1