From 13140aa3ff735def51fa8b285f26723f9f3455bb Mon Sep 17 00:00:00 2001 From: Amir Vadai Date: Wed, 3 Jun 2009 09:34:49 +0300 Subject: [PATCH] sdp: fix RX to work well on sink side + cosmetics changes Signed-off-by: Amir Vadai --- drivers/infiniband/ulp/sdp/sdp.h | 27 ++++- drivers/infiniband/ulp/sdp/sdp_main.c | 122 ++++++++++--------- drivers/infiniband/ulp/sdp/sdp_rx.c | 164 +++++++++++++++----------- drivers/infiniband/ulp/sdp/sdp_tx.c | 5 +- 4 files changed, 188 insertions(+), 130 deletions(-) diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index 25e18442b2de..9b1980fdabad 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -29,7 +29,6 @@ #define sdp_warn(sk, format, arg...) \ sdp_printk(KERN_WARNING, sk, format , ## arg) - #define rx_ring_lock(ssk, f) do { \ spin_lock_irqsave(&ssk->rx_ring.lock, f); \ } while (0) @@ -66,7 +65,7 @@ static inline unsigned long long current_nsec(void) getnstimeofday(&tv); return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec; } -#define sdp_prf(sk, s, format, arg...) ({ \ +#define sdp_prf1(sk, s, format, arg...) ({ \ struct sdpprf_log *l = &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \ l->idx = sdpprf_log_count - 1; \ l->pid = current->pid; \ @@ -80,7 +79,11 @@ static inline unsigned long long current_nsec(void) l->line = __LINE__; \ 1; \ }) +#define sdp_prf(sk, s, format, arg...) +//#define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) + #else +#define sdp_prf1(sk, s, format, arg...) #define sdp_prf(sk, s, format, arg...) #endif @@ -156,6 +159,7 @@ struct sdpstats { u32 rx_poll_miss; u32 tx_poll_miss; u32 tx_poll_hit; + u32 tx_poll_busy; u32 memcpy_count; u32 credits_before_update[64]; u32 send_interval[25]; @@ -344,6 +348,21 @@ struct sdp_chrecvbuf { u32 size; }; +#define posts_handler(ssk) ({\ + atomic_read(&ssk->somebody_is_doing_posts); \ +}) + +#define posts_handler_get(ssk) ({\ + sdp_prf(&ssk->isk.sk, NULL, "posts handler get. %d", posts_handler(ssk)); \ + atomic_inc(&ssk->somebody_is_doing_posts); \ +}) + +#define posts_handler_put(ssk) ({\ + sdp_prf(&ssk->isk.sk, NULL, "posts handler put. %d", posts_handler(ssk)); \ + atomic_dec(&ssk->somebody_is_doing_posts); \ + sdp_do_posts(ssk); \ +}) + struct sdp_sock { /* sk has to be the first member of inet_sock */ struct inet_sock isk; @@ -359,6 +378,8 @@ struct sdp_sock { struct delayed_work dreq_wait_work; struct work_struct destroy_work; + atomic_t somebody_is_doing_posts; + /* Like tcp_sock */ u16 urg_data; u32 urg_seq; @@ -554,7 +575,7 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device); void sdp_rx_ring_destroy(struct sdp_sock *ssk); int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size); int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size); -void sdp_schedule_post_recvs(struct sdp_sock *ssk); +void sdp_do_posts(struct sdp_sock *ssk); void sdp_rx_comp_full(struct sdp_sock *ssk); static inline void sdp_arm_rx_cq(struct sock *sk) diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 432b198939bf..38b303a4e40c 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -210,6 +210,7 @@ static void sdp_destroy_qp(struct sdp_sock *ssk) sdp_dbg(&ssk->isk.sk, "destroying qp\n"); + sdp_prf(&ssk->isk.sk, NULL, "destroying qp"); del_timer(&ssk->tx_ring.timer); @@ -291,7 +292,7 @@ out: sock_put(sk, SOCK_REF_BORN); } -static void sdp_init_timer(struct sock *sk) +static void sdp_init_keepalive_timer(struct sock *sk) { init_timer(&sk->sk_timer); @@ -930,6 +931,8 @@ int sdp_init_sock(struct sock *sk) ssk->destruct_in_process = 0; spin_lock_init(&ssk->lock); + atomic_set(&ssk->somebody_is_doing_posts, 0); + return 0; } @@ -967,7 +970,7 @@ static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb) { TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH; ssk->pushed_seq = ssk->write_seq; - sdp_post_sends(ssk, 0); + sdp_do_posts(ssk); } static inline void sdp_push_pending_frames(struct sock *sk) @@ -975,7 +978,6 @@ static inline void sdp_push_pending_frames(struct sock *sk) struct sk_buff *skb = sk->sk_send_head; if (skb) { sdp_mark_push(sdp_sk(sk), skb); - sdp_post_sends(sdp_sk(sk), 0); } } @@ -1208,23 +1210,6 @@ static int sdp_recv_urg(struct sock *sk, long timeo, return -EAGAIN; } -static unsigned int sdp_current_mss(struct sock *sk, int large_allowed) -{ - /* TODO */ - return PAGE_SIZE; -} - -static int forced_push(struct sdp_sock *sk) -{ - /* TODO */ - return 0; -} - -static inline int select_size(struct sock *sk, struct sdp_sock *ssk) -{ - return 0; -} - static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags) { if (unlikely(flags & MSG_OOB)) { @@ -1233,12 +1218,11 @@ static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags } } -static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags, - int mss_now, int nonagle) +static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags) { if (sk->sk_send_head) sdp_mark_urg(sk, ssk, flags); - sdp_post_sends(ssk, nonagle); + sdp_do_posts(sdp_sk(sk)); } static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk, @@ -1254,10 +1238,6 @@ static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk, ssk->nonagle &= ~TCP_NAGLE_PUSH; } -static void sdp_push_one(struct sock *sk, unsigned int mss_now) -{ -} - static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz) { int i, max_retry; @@ -1375,7 +1355,6 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk, return bz; } - #define TCP_PAGE(sk) (sk->sk_sndmsg_page) #define TCP_OFF(sk) (sk->sk_sndmsg_off) static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb, @@ -1468,7 +1447,6 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb, return copy; } - static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb, unsigned char __user *from, int copy, struct bzcopy_state *bz) @@ -1585,17 +1563,23 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p, clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); + posts_handler_put(ssk); + if (sdp_bzcopy_slots_avail(ssk, bz)) break; set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); sk->sk_write_pending++; - sdp_prf(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d", + sdp_prf1(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d", tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring), bz->busy); sk_wait_event(sk, ¤t_timeo, sdp_bzcopy_slots_avail(ssk, bz) && vm_wait); sk->sk_write_pending--; + sdp_prf1(sk, NULL, "finished wait for mem"); + + posts_handler_get(ssk); + sdp_do_posts(ssk); if (vm_wait) { vm_wait -= current_timeo; @@ -1621,7 +1605,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, struct sdp_sock *ssk = sdp_sk(sk); struct sk_buff *skb; int iovlen, flags; - int mss_now, size_goal; + int size_goal; int err, copied; long timeo; struct bzcopy_state *bz = NULL; @@ -1629,6 +1613,8 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, lock_sock(sk); sdp_dbg_data(sk, "%s\n", __func__); + posts_handler_get(ssk); + flags = msg->msg_flags; timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT); @@ -1640,7 +1626,6 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, /* This should be in poll */ clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags); - mss_now = sdp_current_mss(sk, !(flags&MSG_OOB)); size_goal = ssk->xmit_size_goal; /* Ok commence sending. */ @@ -1697,13 +1682,10 @@ new_segment: goto wait_for_sndbuf; } - skb = sdp_stream_alloc_skb(sk, select_size(sk, ssk), - sk->sk_allocation); + skb = sdp_stream_alloc_skb(sk, 0, sk->sk_allocation); if (!skb) goto wait_for_memory; -// sdp_prf(sk, skb, "Created"); - BZCOPY_STATE(skb) = bz; /* @@ -1718,7 +1700,6 @@ new_segment: skb_entail(sk, ssk, skb); copy = size_goal; } else { -// sdp_prf(sk, skb, "adding %d bytes", copy); sdp_dbg_data(sk, "adding to existing skb: %p" " len = %d, sk_send_head: %p copy: %d\n", skb, skb->len, sk->sk_send_head, copy); @@ -1761,15 +1742,8 @@ new_segment: if ((seglen -= copy) == 0 && iovlen == 0) goto out; - if (skb->len < mss_now || (flags & MSG_OOB)) + if (skb->len < PAGE_SIZE || (flags & MSG_OOB)) continue; - - if (forced_push(ssk)) { - sdp_mark_push(ssk, skb); - /* TODO: and push pending frames mss_now */ - /* sdp_push_pending(sk, ssk, mss_now, TCP_NAGLE_PUSH); */ - } else if (skb == sk->sk_send_head) - sdp_push_one(sk, mss_now); continue; wait_for_sndbuf: @@ -1778,24 +1752,32 @@ wait_for_memory: sdp_prf(sk, skb, "wait for mem"); SDPSTATS_COUNTER_INC(send_wait_for_mem); if (copied) - sdp_push(sk, ssk, flags & ~MSG_MORE, PAGE_SIZE, TCP_NAGLE_PUSH); + sdp_push(sk, ssk, flags & ~MSG_MORE); sdp_xmit_poll(ssk, 1); - err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) : - sk_stream_wait_memory(sk, &timeo); - sdp_prf(sk, skb, "finished wait for mem. err: %d", err); + if (bz) { + err = sdp_bzcopy_wait_memory(ssk, &timeo, bz); + } else { + posts_handler_put(ssk); + + err = sk_stream_wait_memory(sk, &timeo); + + sdp_prf1(sk, skb, "finished wait for mem. err: %d", err); + posts_handler_get(ssk); + sdp_do_posts(ssk); + } + if (err) goto do_error; - mss_now = sdp_current_mss(sk, !(flags&MSG_OOB)); size_goal = ssk->xmit_size_goal; } } out: if (copied) { - sdp_push(sk, ssk, flags, mss_now, ssk->nonagle); + sdp_push(sk, ssk, flags); if (bz) bz = sdp_bz_cleanup(bz); @@ -1804,6 +1786,8 @@ out: poll_send_cq(sk); } + posts_handler_put(ssk); + release_sock(sk); return copied; @@ -1824,6 +1808,9 @@ out_err: if (bz) bz = sdp_bz_cleanup(bz); err = sk_stream_error(sk, flags, err); + + posts_handler_put(ssk); + release_sock(sk); return err; } @@ -1863,6 +1850,8 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, lock_sock(sk); sdp_dbg_data(sk, "%s\n", __func__); + posts_handler_get(ssk); + sdp_prf(sk, skb, "Read from user"); err = -ENOTCONN; @@ -1900,7 +1889,10 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, if (!skb) break; - BUG_ON((skb_transport_header(skb))[0] != SDP_MID_DATA); + if ((skb_transport_header(skb))[0] == SDP_MID_DISCONN) { + sdp_prf(sk, NULL, "Got DISCONN skb - killing it"); + goto found_fin_ok; + } if (before(*seq, TCP_SKB_CB(skb)->seq)) { sdp_warn(sk, "recvmsg bug: copied %X seq %X\n", @@ -1970,7 +1962,17 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, lock_sock(sk); } else if (rc) { sdp_dbg_data(sk, "%s: sk_wait_data %ld\n", __func__, timeo); + sdp_prf(sk, NULL, "waiting for data"); + + posts_handler_put(ssk); + + /* socket lock is released inside sk_wait_data */ sk_wait_data(sk, &timeo); + + posts_handler_get(ssk); + sdp_prf(sk, NULL, "got data"); + + sdp_do_posts(ssk); } continue; @@ -2022,7 +2024,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, sdp_dbg_data(sk, "%s: done copied %d target %d\n", __func__, copied, target); - sdp_schedule_post_recvs(sdp_sk(sk)); + sdp_do_posts(sdp_sk(sk)); skip_copy: if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq)) ssk->urg_data = 0; @@ -2034,10 +2036,22 @@ skip_copy: skb_unlink(skb, &sk->sk_receive_queue); __kfree_skb(skb); } + continue; +found_fin_ok: + ++*seq; + if (!(flags & MSG_PEEK)) { + skb_unlink(skb, &sk->sk_receive_queue); + __kfree_skb(skb); + } + break; + } while (len > 0); err = copied; out: + + posts_handler_put(ssk); + release_sock(sk); return err; @@ -2257,7 +2271,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol) sk->sk_destruct = sdp_destruct; - sdp_init_timer(sk); + sdp_init_keepalive_timer(sk); sock->ops = &sdp_proto_ops; sock->state = SS_UNCONNECTED; diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c index 7d5b33edc703..7e17423a08b1 100644 --- a/drivers/infiniband/ulp/sdp/sdp_rx.c +++ b/drivers/infiniband/ulp/sdp/sdp_rx.c @@ -50,6 +50,14 @@ static int top_mem_usage = 0; module_param_named(top_mem_usage, top_mem_usage, int, 0644); MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB)."); +static int hw_int_mod_count = 10; +module_param_named(hw_int_mod_count, hw_int_mod_count, int, 0644); +MODULE_PARM_DESC(hw_int_mod_count, "HW interrupt moderation. int count"); + +static int hw_int_mod_msec = 200; +module_param_named(hw_int_mod_msec, hw_int_mod_msec, int, 0644); +MODULE_PARM_DESC(hw_int_mod_count, "HW interrupt moderation. mseq"); + #ifdef CONFIG_PPC static int max_large_sockets = 100; #else @@ -259,7 +267,6 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk) max_bytes = sk->sk_rcvbuf * scale; if (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE)) { - sdp_prf(sk, NULL, "rx ring is full"); return 0; } @@ -280,38 +287,18 @@ static inline int sdp_post_recvs_needed(struct sdp_sock *ssk) static inline void sdp_post_recvs(struct sdp_sock *ssk) { - int rc = 0; again: - ssk->posts_in_process = 1; - - do { - if (!sdp_post_recvs_needed(ssk)) - break; - - rc = sdp_post_recv(ssk); - } while (!rc); + while (sdp_post_recvs_needed(ssk)) { + if (sdp_post_recv(ssk)) + goto out; + } sk_stream_mem_reclaim(&ssk->isk.sk); - ssk->posts_in_process = 0; if (sdp_post_recvs_needed(ssk)) goto again; -} - -void sdp_schedule_post_recvs(struct sdp_sock *ssk) -{ - struct sock *sk = &ssk->isk.sk; - - WARN_ON_LOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - - if (unlikely(!ssk->id || ((1 << sk->sk_state) & - (TCPF_CLOSE | TCPF_TIME_WAIT)))) { - sdp_prf(sk, NULL, "socket is closed - not posting"); - return; - } - - if (!ssk->posts_in_process && sdp_post_recvs_needed(ssk)) - queue_work(rx_comp_wq, &ssk->rx_comp_work); +out: + sk_stream_mem_reclaim(&ssk->isk.sk); } static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk, @@ -416,8 +403,6 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) struct sk_buff *skb; int i, frags; - WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - if (unlikely(id != ring_tail(ssk->rx_ring))) { printk(KERN_WARNING "Bogus recv completion id %d tail %d\n", id, ring_tail(ssk->rx_ring)); @@ -442,7 +427,7 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id) /* socket lock should be taken before calling this */ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) { - struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data; + struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb); struct sock *sk = &ssk->isk.sk; switch (h->mid) { @@ -460,28 +445,33 @@ static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb) sk->sk_prot->disconnect(sk, 0); } + __kfree_skb(skb); break; case SDP_MID_DISCONN: sdp_dbg_data(sk, "Handling RX disconnect\n"); + sdp_prf(sk, NULL, "Handling RX disconnect"); sdp_fin(sk); + sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg"); + /* Enqueue fin skb to release sleeping recvmsg */ + sdp_sock_queue_rcv_skb(sk, skb); break; case SDP_MID_CHRCVBUF: sdp_dbg_data(sk, "Handling RX CHRCVBUF\n"); - sdp_handle_resize_request(ssk, - (struct sdp_chrecvbuf *)skb->data); + sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)h); + __kfree_skb(skb); break; case SDP_MID_CHRCVBUF_ACK: sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n"); - sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data); + sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)h); + __kfree_skb(skb); break; default: /* TODO: Handle other messages */ sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid); + __kfree_skb(skb); } - __kfree_skb(skb); - return 0; } @@ -494,7 +484,7 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) unsigned long mseq_ack; int credits_before; - h = (struct sdp_bsdh *)skb->data; + h = (struct sdp_bsdh *)skb_transport_header(skb); SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk)); @@ -505,7 +495,7 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) if (mseq_ack >= ssk->nagle_last_unacked) ssk->nagle_last_unacked = 0; - sdp_prf(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d", + sdp_prf1(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before, tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack)); @@ -521,18 +511,25 @@ static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb) /* if (unlikely(h->flags & SDP_OOB_PEND)) sk_send_sigurg(sk);*/ + skb_pull(skb, sizeof(struct sdp_bsdh)); + if (h->mid != SDP_MID_DATA || unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) { - sdp_warn(sk, "Control skb - queing to control queue\n"); + sdp_prf(sk, NULL, "Control skb - queing to control queue"); skb_queue_tail(&ssk->rx_ctl_q, skb); return 0; } - skb_pull(skb, sizeof(struct sdp_bsdh)); - if (unlikely(skb->len <= 0)) { __kfree_skb(skb); + return 0; } + sdp_prf(sk, NULL, "queueing a %s skb", (h->mid == SDP_MID_DATA ? "data" : "disconnect")); + skb = sdp_sock_queue_rcv_skb(sk, skb); + +/* if (unlikely(h->flags & SDP_OOB_PRES)) + sdp_urg(ssk, skb);*/ + return 0; } @@ -563,7 +560,7 @@ static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc) sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n", (int)wc->wr_id, wc->byte_len); if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) { - printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n", + sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n", wc->byte_len, sizeof(struct sdp_bsdh)); __kfree_skb(skb); return NULL; @@ -599,15 +596,14 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk) struct socket *sock = sk->sk_socket; if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) { - sdp_prf(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d", + sdp_prf1(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d", tx_credits(ssk), ssk->min_bufs, ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring)); } - if (tx_credits(ssk) >= ssk->min_bufs && - ring_head(ssk->tx_ring) == ring_tail(ssk->tx_ring) && - sock != NULL) { + if (tx_credits(ssk) >= ssk->min_bufs && sock != NULL) { clear_bit(SOCK_NOSPACE, &sock->flags); + sdp_prf1(sk, NULL, "Waking up sleepers"); if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) wake_up_interruptible(sk->sk_sleep); @@ -616,7 +612,7 @@ static void sdp_bzcopy_write_space(struct sdp_sock *ssk) } } -/* only from interrupt. +/* only from interrupt. */ static int sdp_poll_rx_cq(struct sdp_sock *ssk) { struct ib_cq *cq = ssk->rx_ring.cq; @@ -625,8 +621,6 @@ static int sdp_poll_rx_cq(struct sdp_sock *ssk) int wc_processed = 0; struct sk_buff *skb; - WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - do { n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); for (i = 0; i < n; ++i) { @@ -652,39 +646,59 @@ void sdp_rx_comp_work(struct work_struct *work) { struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); struct sock *sk = &ssk->isk.sk; - int xmit_poll_force; - struct sk_buff *skb; sdp_prf(sk, NULL, "%s", __func__); - if (unlikely(!ssk->rx_ring.cq)) + if (unlikely(!ssk->qp)) { + sdp_prf(sk, NULL, "qp was destroyed"); + return; + } + if (unlikely(!ssk->rx_ring.cq)) { + sdp_prf(sk, NULL, "rx_ring.cq is NULL"); return; + } if (unlikely(!ssk->poll_cq)) { struct rdma_cm_id *id = ssk->id; sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n"); + sdp_prf(sk, NULL, "poll cq is 0"); if (id && id->qp) rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED); return; } - sdp_post_recvs(ssk); + lock_sock(sk); - if (!(sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS)) && - !credit_update_needed(ssk) && - skb_queue_empty(&ssk->rx_ctl_q)) { - sdp_prf(&ssk->isk.sk, NULL, "only post recvs"); - return; - } + sdp_do_posts(ssk); - lock_sock(sk); + release_sock(sk); +} - sdp_post_sends(ssk, 0); +void sdp_do_posts(struct sdp_sock *ssk) +{ + struct sock *sk = &ssk->isk.sk; + int xmit_poll_force; + struct sk_buff *skb; while ((skb = skb_dequeue(&ssk->rx_ctl_q))) { sdp_process_rx_ctl_skb(ssk, skb); } + if (sk->sk_state == TCP_TIME_WAIT) { + sdp_prf(sk, NULL, "in TIMEWAIT. qp=%p cq=%p", ssk->qp, ssk->rx_ring.cq); + return; + } + + if (!ssk->rx_ring.cq || !ssk->tx_ring.cq) + return; + + sdp_post_recvs(ssk); + + if (ring_posted(ssk->tx_ring)) + sdp_xmit_poll(ssk, 1); + + sdp_post_sends(ssk, 0); + sk_stream_mem_reclaim(sk); xmit_poll_force = sk->sk_write_pending && (tx_credits(ssk) > SDP_MIN_TX_CREDITS); @@ -692,12 +706,11 @@ void sdp_rx_comp_work(struct work_struct *work) if (credit_update_needed(ssk) || xmit_poll_force) { /* if has pending tx because run out of tx_credits - xmit it */ sdp_prf(sk, NULL, "Processing to free pending sends"); - sdp_xmit_poll(ssk, xmit_poll_force); - sdp_prf(sk, NULL, "Sending credit update if needed"); + sdp_xmit_poll(ssk, xmit_poll_force); + sdp_prf(sk, NULL, "Sending credit update"); sdp_post_sends(ssk, 0); } - release_sock(sk); } static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) @@ -710,7 +723,10 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) sdp_dbg_data(&ssk->isk.sk, "rx irq called\n"); - WARN_ON(cq != ssk->rx_ring.cq); + if (cq != ssk->rx_ring.cq) { + sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq); + return; + } SDPSTATS_COUNTER_INC(rx_int_count); @@ -736,7 +752,19 @@ static void sdp_rx_irq(struct ib_cq *cq, void *cq_context) sdp_prf(&ssk->isk.sk, NULL, "credits: %d -> %d", credits_before, tx_credits(ssk)); - queue_work(rx_comp_wq, &ssk->rx_comp_work); + if (posts_handler(ssk) || + (!skb_queue_empty(&ssk->rx_ctl_q) && + (sk->sk_socket && (sk->sk_socket->flags & SOCK_ASYNC_WAITDATA)))) { + + sdp_prf(&ssk->isk.sk, NULL, "Somebody is doing the post work for me. %d", + posts_handler(ssk)); + + } else { + sdp_prf(&ssk->isk.sk, NULL, "Queuing work. others: %d, ctl_q: %d", + posts_handler(ssk), + !skb_queue_empty(&ssk->rx_ctl_q)); + queue_work(rx_comp_wq, &ssk->rx_comp_work); + } } sdp_arm_rx_cq(sk); @@ -746,8 +774,6 @@ out: static void sdp_rx_ring_purge(struct sdp_sock *ssk) { - WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - while (ring_posted(ssk->rx_ring) > 0) { struct sk_buff *skb; skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring)); @@ -798,7 +824,7 @@ int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device) goto err_cq; } - rc = ib_modify_cq(rx_cq, 10, 200); + rc = ib_modify_cq(rx_cq, hw_int_mod_count, hw_int_mod_msec); if (rc) { sdp_warn(&ssk->isk.sk, "Unable to modify RX CQ: %d.\n", rc); goto err_mod; @@ -824,8 +850,6 @@ out: void sdp_rx_ring_destroy(struct sdp_sock *ssk) { - WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock); - if (ssk->rx_ring.buffer) { sdp_rx_ring_purge(ssk); diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c index 95e3c82320cc..594bfc36f299 100644 --- a/drivers/infiniband/ulp/sdp/sdp_tx.c +++ b/drivers/infiniband/ulp/sdp/sdp_tx.c @@ -92,7 +92,7 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) h->mseq = htonl(mseq); h->mseq_ack = htonl(mseq_ack(ssk)); - sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d", + sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d", mid2str(mid), ring_posted(ssk->rx_ring), mseq, ntohl(h->mseq_ack)); SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h); @@ -214,7 +214,7 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) { struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data; - sdp_prf(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq)); + sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq)); } sk_wmem_free_skb(&ssk->isk.sk, skb); @@ -272,7 +272,6 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk) if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) sk_stream_write_space(&ssk->isk.sk); - } return wc_processed; -- 2.50.1