From: Amir Vadai Date: Tue, 21 Apr 2009 14:18:37 +0000 (+0300) Subject: sdp: no tx interrupts X-Git-Tag: v4.1.12-92~264^2~5^2~285 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=bd029a7efae5b3e2cbb78e1c38830221be881636;p=users%2Fjedix%2Flinux-maple.git sdp: no tx interrupts poll tx cq with timer instead of interrupts Signed-off-by: Amir Vadai --- diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h index cbef0eb03736a..81280655d3acd 100644 --- a/drivers/infiniband/ulp/sdp/sdp.h +++ b/drivers/infiniband/ulp/sdp/sdp.h @@ -83,12 +83,14 @@ struct sdpstats { u32 sendmsg_seglen[25]; u32 send_size[25]; u32 post_recv; - u32 int_count; + u32 rx_int_count; + u32 tx_int_count; u32 bzcopy_poll_miss; u32 send_wait_for_mem; u32 send_miss_no_credits; u32 rx_poll_miss; u32 tx_poll_miss; + u32 tx_poll_hit; u32 memcpy_count; u32 credits_before_update[64]; u32 send_interval[25]; @@ -132,6 +134,11 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log) #define sock_put(sk, msg) sock_ref(sk, msg, sock_put) #define __sock_put(sk, msg) sock_ref(sk, msg, __sock_put) +/* Interval between sucessive polls in the Tx routine when polling is used + instead of interrupts (in per-core Tx rings) - should be power of 2 */ +#define SDP_TX_POLL_MODER 16 +#define SDP_TX_POLL_TIMEOUT (HZ / 4) + #define SDP_RESOLVE_TIMEOUT 1000 #define SDP_ROUTE_TIMEOUT 1000 #define SDP_RETRY_COUNT 5 @@ -242,7 +249,10 @@ struct sdp_tx_ring { int una_seq; unsigned credits; + + struct timer_list timer; u16 poll_cnt; + struct ib_cq *cq; }; static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring) @@ -258,7 +268,7 @@ struct sdp_sock { struct list_head backlog_queue; struct sock *parent; - struct work_struct work; + struct work_struct rx_comp_work; wait_queue_head_t wq; struct delayed_work dreq_wait_work; @@ -308,7 +318,7 @@ struct sdp_sock { /* rdma specific */ struct ib_qp *qp; - struct ib_cq *cq; + struct ib_cq *rx_cq; struct ib_mr *mr; struct sdp_buf *rx_ring; @@ -349,7 +359,7 @@ struct bzcopy_state { extern int rcvbuf_initial_size; extern struct proto sdp_proto; -extern struct workqueue_struct *comp_wq; +extern struct workqueue_struct *rx_comp_wq; extern atomic_t sdp_current_mem_usage; extern spinlock_t sdp_large_sockets_lock; @@ -440,11 +450,11 @@ static inline void sdp_set_error(struct sock *sk, int err) sk->sk_error_report(sk); } -static inline void sdp_arm_cq(struct sock *sk) +static inline void sdp_arm_rx_cq(struct sock *sk) { - sdp_dbg_data(sk, "ib_req_notify_cq on cq\n"); + sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n"); - ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP); + ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP); } #ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA @@ -453,12 +463,15 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *); void sdp_reset(struct sock *sk); void sdp_reset_sk(struct sock *sk, int rc); -void sdp_completion_handler(struct ib_cq *cq, void *cq_context); -void sdp_work(struct work_struct *work); +void sdp_rx_irq(struct ib_cq *cq, void *cq_context); +void sdp_tx_irq(struct ib_cq *cq, void *cq_context); +void sdp_poll_tx_cq(unsigned long data); +void sdp_rx_comp_work(struct work_struct *work); +void sdp_process_tx_wc_work(struct work_struct *work); int sdp_post_credits(struct sdp_sock *ssk); void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid); void sdp_post_recvs(struct sdp_sock *ssk); -int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq); +int sdp_poll_rx_cq(struct sdp_sock *ssk); void sdp_post_sends(struct sdp_sock *ssk, int nonagle); void sdp_destroy_work(struct work_struct *work); void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk); @@ -477,6 +490,7 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk); int sdp_init_sock(struct sock *sk); int __init sdp_proc_init(void); void sdp_proc_unregister(void); +int sdp_xmit_poll(struct sdp_sock *ssk, int force); static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp) { diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c index 9f7f4a07f8913..29c9761ec96b1 100644 --- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c +++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c @@ -244,6 +244,24 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s } #endif +static int sdp_process_tx_cq(struct sdp_sock *ssk); + +int sdp_xmit_poll(struct sdp_sock *ssk, int force) +{ + int wc_processed = 0; + + /* If we don't have a pending timer, set one up to catch our recent + post in case the interface becomes idle */ + if (!timer_pending(&ssk->tx_ring.timer)) + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + + /* Poll the CQ every SDP_TX_POLL_MODER packets */ + if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) + wc_processed = sdp_process_tx_cq(ssk); + + return wc_processed; +} + void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid) { struct sdp_buf *tx_req; @@ -573,6 +591,7 @@ int sdp_post_credits(struct sdp_sock *ssk) if (!skb) return -ENOMEM; sdp_post_send(ssk, skb, SDP_MID_DATA); + sdp_xmit_poll(ssk, 0); } return 0; } @@ -583,6 +602,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) struct sk_buff *skb; int c; gfp_t gfp_page; + int post_count = 0; if (unlikely(!ssk->id)) { if (ssk->isk.sk.sk_send_head) { @@ -602,7 +622,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) if (ssk->recv_request && ssk->rx_tail >= ssk->recv_request_head && ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS && - ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) { + sdp_tx_ring_slots_left(&ssk->tx_ring)) { struct sdp_chrecvbuf *resp_size; ssk->recv_request = 0; skb = sdp_stream_alloc_skb(&ssk->isk.sk, @@ -614,6 +634,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size); resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE); sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK); + post_count++; } if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS && @@ -634,12 +655,13 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) update_send_head(&ssk->isk.sk, skb); __skb_dequeue(&ssk->isk.sk.sk_write_queue); sdp_post_send(ssk, skb, SDP_MID_DATA); + post_count++; } if (ssk->tx_ring.credits == SDP_MIN_TX_CREDITS && !ssk->sent_request && ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT && - ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) { + sdp_tx_ring_slots_left(&ssk->tx_ring)) { struct sdp_chrecvbuf *req_size; skb = sdp_stream_alloc_skb(&ssk->isk.sk, sizeof(struct sdp_bsdh) + @@ -652,6 +674,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size); req_size->size = htonl(ssk->sent_request); sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF); + post_count++; } c = ssk->remote_credits; @@ -660,7 +683,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) if (unlikely(c < ssk->rx_head - ssk->rx_tail) && likely(ssk->tx_ring.credits > 1) && - likely(ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) && + likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) && likely((1 << ssk->isk.sk.sk_state) & (TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) { skb = sdp_stream_alloc_skb(&ssk->isk.sk, @@ -670,6 +693,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) BUG_ON(!skb); SDPSTATS_COUNTER_INC(post_send_credits); sdp_post_send(ssk, skb, SDP_MID_DATA); + post_count++; } /* send DisConn if needed @@ -687,7 +711,10 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle) /* FIXME */ BUG_ON(!skb); sdp_post_send(ssk, skb, SDP_MID_DISCONN); + post_count++; } + if (post_count) + sdp_xmit_poll(ssk, 0); } int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size) @@ -867,7 +894,6 @@ static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc) static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) { struct sk_buff *skb = NULL; - struct sdp_bsdh *h; skb = sdp_send_completion(ssk, wc->wr_id); if (unlikely(!skb)) @@ -881,72 +907,162 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc) sdp_set_error(sk, -ECONNRESET); wake_up(&ssk->wq); - queue_work(comp_wq, &ssk->destroy_work); + queue_work(rx_comp_wq, &ssk->destroy_work); } - goto out; } - h = (struct sdp_bsdh *)skb->data; + sk_stream_free_skb(&ssk->isk.sk, skb); - if (likely(h->mid != SDP_MID_DISCONN)) - goto out; + return 0; +} + +void sdp_rx_irq(struct ib_cq *cq, void *cq_context) +{ + struct sock *sk = cq_context; + struct sdp_sock *ssk = sdp_sk(sk); + + WARN_ON(ssk->rx_cq && cq != ssk->rx_cq); + + if (!ssk->rx_cq) + sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n"); + + SDPSTATS_COUNTER_INC(rx_int_count); + + queue_work(rx_comp_wq, &ssk->rx_comp_work); +} - if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) { - sdp_dbg(&ssk->isk.sk, - "%s: sent DISCONNECT from unexpected state %d\n", - __func__, ssk->isk.sk.sk_state); +static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc) +{ + if (likely(wc->wr_id & SDP_OP_SEND)) { + sdp_handle_send_comp(ssk, wc); + return; } -out: sk_wmem_free_skb(&ssk->isk.sk, skb); - return 0; + /* Keepalive probe sent cleanup */ + sdp_cnt(sdp_keepalive_probes_sent); + + if (likely(!wc->status)) + return; + + sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n", + __func__, wc->status); + + if (wc->status == IB_WC_WR_FLUSH_ERR) + return; + + sdp_set_error(&ssk->isk.sk, -ECONNRESET); + wake_up(&ssk->wq); } -static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc) +static int sdp_process_tx_cq(struct sdp_sock *ssk) { - if (wc->wr_id & SDP_OP_RECV) { - if (sdp_handle_recv_comp(ssk, wc)) - return; - } else if (likely(wc->wr_id & SDP_OP_SEND)) { - if (sdp_handle_send_comp(ssk, wc)) - return; - } else { - sdp_cnt(sdp_keepalive_probes_sent); + struct ib_wc ibwc[SDP_NUM_WC]; + int n, i; + int wc_processed = 0; - if (likely(!wc->status)) - return; + if (!ssk->tx_ring.cq) { + sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n"); + return 0; + } + + do { + n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc); + for (i = 0; i < n; ++i) { + sdp_process_tx_wc(ssk, ibwc + i); + wc_processed++; + } + } while (n == SDP_NUM_WC); - sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n", - __func__, wc->status); + sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed); - if (wc->status == IB_WC_WR_FLUSH_ERR) - return; + if (wc_processed) { + struct sock *sk = &ssk->isk.sk; + sdp_post_sends(ssk, 0); - sdp_set_error(&ssk->isk.sk, -ECONNRESET); - wake_up(&ssk->wq); + if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) + sk_stream_write_space(&ssk->isk.sk); - return; } + + return wc_processed; } -void sdp_completion_handler(struct ib_cq *cq, void *cq_context) +void sdp_poll_tx_cq(unsigned long data) +{ + struct sdp_sock *ssk = (struct sdp_sock *)data; + struct sock *sk = &ssk->isk.sk; + u32 inflight, wc_processed; + + sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n", + (u32) ssk->tx_ring.head - ssk->tx_ring.tail); + + /* Only process if the socket is not in use */ + bh_lock_sock(sk); + if (sock_owned_by_user(sk)) { + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n"); + goto out; + } + + if (sk->sk_state == TCP_CLOSE) + goto out; + + wc_processed = sdp_process_tx_cq(ssk); + if (!wc_processed) + SDPSTATS_COUNTER_INC(tx_poll_miss); + else + SDPSTATS_COUNTER_INC(tx_poll_hit); + + inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail; + + /* If there are still packets in flight and the timer has not already + * been scheduled by the Tx routine then schedule it here to guarantee + * completion processing of these packets */ + if (inflight) { /* TODO: make sure socket is not closed */ + sdp_dbg_data(sk, "arming timer for more polling\n"); + mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT); + } + +out: + bh_unlock_sock(sk); +} + + +void sdp_tx_irq(struct ib_cq *cq, void *cq_context) { struct sock *sk = cq_context; struct sdp_sock *ssk = sdp_sk(sk); - schedule_work(&ssk->work); - SDPSTATS_COUNTER_INC(int_count); + + sdp_warn(sk, "Got tx comp interrupt\n"); + + mod_timer(&ssk->tx_ring.timer, jiffies + 1); } -int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq) + +int sdp_poll_rx_cq(struct sdp_sock *ssk) { + struct ib_cq *cq = ssk->rx_cq; struct ib_wc ibwc[SDP_NUM_WC]; int n, i; int ret = -EAGAIN; + int updated_credits = 0; + do { n = ib_poll_cq(cq, SDP_NUM_WC, ibwc); for (i = 0; i < n; ++i) { - sdp_handle_wc(ssk, &ibwc[i]); + struct ib_wc *wc = &ibwc[i]; + + BUG_ON(!(wc->wr_id & SDP_OP_RECV)); + sdp_handle_recv_comp(ssk, wc); + + if (!updated_credits) { + sdp_post_recvs(ssk); + sdp_post_sends(ssk, 0); + updated_credits = 1; + } + ret = 0; } } while (n == SDP_NUM_WC); @@ -968,17 +1084,20 @@ int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq) return ret; } -void sdp_work(struct work_struct *work) +static inline int sdp_tx_qp_empty(struct sdp_sock *ssk) { - struct sdp_sock *ssk = container_of(work, struct sdp_sock, work); - struct sock *sk = &ssk->isk.sk; - struct ib_cq *cq; + return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0; +} - sdp_dbg_data(sk, "%s\n", __func__); +void sdp_rx_comp_work(struct work_struct *work) +{ + struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work); + struct sock *sk = &ssk->isk.sk; + struct ib_cq *rx_cq; lock_sock(sk); - cq = ssk->cq; - if (unlikely(!cq)) + rx_cq = ssk->rx_cq; + if (unlikely(!rx_cq)) goto out; if (unlikely(!ssk->poll_cq)) { @@ -988,15 +1107,18 @@ void sdp_work(struct work_struct *work) goto out; } - sdp_poll_cq(ssk, cq); + sdp_poll_rx_cq(ssk); + sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */ release_sock(sk); sk_mem_reclaim(sk); lock_sock(sk); - cq = ssk->cq; - if (unlikely(!cq)) + rx_cq = ssk->rx_cq; + if (unlikely(!rx_cq)) goto out; - sdp_arm_cq(sk); - sdp_poll_cq(ssk, cq); + + sdp_arm_rx_cq(sk); + sdp_poll_rx_cq(ssk); + sdp_xmit_poll(ssk, 1); out: release_sock(sk); } diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c index 1e1ff9dde5623..7c34637fd9a26 100644 --- a/drivers/infiniband/ulp/sdp/sdp_cma.c +++ b/drivers/infiniband/ulp/sdp/sdp_cma.c @@ -73,7 +73,7 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) .qp_type = IB_QPT_RC, }; struct ib_device *device = id->device; - struct ib_cq *cq; + struct ib_cq *rx_cq, *tx_cq; struct ib_mr *mr; struct ib_pd *pd; int rc; @@ -118,32 +118,49 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) } sdp_sk(sk)->mr = mr; - INIT_WORK(&sdp_sk(sk)->work, sdp_work); + INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work); - cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler, - sk, SDP_TX_SIZE + SDP_RX_SIZE, 0); + rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler, + sk, SDP_RX_SIZE, 0); - if (IS_ERR(cq)) { - rc = PTR_ERR(cq); - sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc); - goto err_cq; + if (IS_ERR(rx_cq)) { + rc = PTR_ERR(rx_cq); + sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc); + goto err_rx_cq; } - rc = ib_modify_cq(cq, 10, 200); + rc = ib_modify_cq(rx_cq, 10, 200); if (rc) { sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc); - goto err_qp; + goto err_tx_cq; } sdp_warn(sk, "Initialized CQ moderation\n"); + sdp_sk(sk)->rx_cq = rx_cq; + sdp_arm_rx_cq(sk); + qp_init_attr.recv_cq = rx_cq; + + tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler, + sk, SDP_TX_SIZE, 0); + + if (IS_ERR(tx_cq)) { + rc = PTR_ERR(tx_cq); + sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc); + goto err_tx_cq; + } - qp_init_attr.send_cq = qp_init_attr.recv_cq = cq; + init_timer(&sdp_sk(sk)->tx_ring.timer); + sdp_sk(sk)->tx_ring.timer.function = sdp_poll_tx_cq; + sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk); + sdp_sk(sk)->tx_ring.poll_cnt = 0; + + sdp_sk(sk)->tx_ring.cq = tx_cq; + qp_init_attr.send_cq = tx_cq; rc = rdma_create_qp(id, pd, &qp_init_attr); if (rc) { sdp_warn(sk, "Unable to create QP: %d.\n", rc); goto err_qp; } - sdp_sk(sk)->cq = cq; sdp_sk(sk)->qp = id->qp; sdp_sk(sk)->ib_device = device; @@ -153,8 +170,10 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id) return 0; err_qp: - ib_destroy_cq(cq); -err_cq: + ib_destroy_cq(tx_cq); +err_tx_cq: + ib_destroy_cq(rx_cq); +err_rx_cq: ib_dereg_mr(sdp_sk(sk)->mr); err_mr: ib_dealloc_pd(pd); @@ -162,7 +181,6 @@ err_pd: kfree(sdp_sk(sk)->rx_ring); sdp_sk(sk)->rx_ring = NULL; err_rx: - WARN_ON(sdp_sk(sk)->tx_ring.head != sdp_sk(sk)->tx_ring.tail); kfree(sdp_sk(sk)->tx_ring.buffer); sdp_sk(sk)->tx_ring.buffer = NULL; err_tx: @@ -258,21 +276,20 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id, sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4; sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) - sizeof(struct sdp_bsdh); - sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) / - PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS); - sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal, - sdp_sk(sk)->send_frags * PAGE_SIZE); + sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) / + PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS); + sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal, + sdp_sk(sk)->send_frags * PAGE_SIZE); - sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send_frags: %d send trigger %d\n", - __func__, + sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n", sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->xmit_size_goal, - sdp_sk(sk)->send_frags, + sdp_sk(sk)->send_frags, sdp_sk(sk)->min_bufs); sdp_sk(sk)->poll_cq = 1; - sdp_arm_cq(sk); - sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq); + sdp_arm_rx_cq(sk); + sdp_poll_rx_cq(sdp_sk(sk)); sk->sk_state_change(sk); sk_wake_async(sk, 0, POLL_OUT); @@ -332,8 +349,11 @@ static int sdp_disconnected_handler(struct sock *sk) sdp_dbg(sk, "%s\n", __func__); - if (ssk->cq) - sdp_poll_cq(ssk, ssk->cq); + if (ssk->rx_cq) + sdp_poll_rx_cq(ssk); + + if (ssk->tx_ring.cq) + sdp_xmit_poll(ssk, 1); if (sk->sk_state == TCP_SYN_RECV) { sdp_connected_handler(sk, NULL); diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c index 6b7494f5ff50e..454abeb2872ab 100644 --- a/drivers/infiniband/ulp/sdp/sdp_main.c +++ b/drivers/infiniband/ulp/sdp/sdp_main.c @@ -135,7 +135,8 @@ static int sdp_zcopy_thresh = 65536; module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644); MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff."); -struct workqueue_struct *comp_wq; +struct workqueue_struct *sdp_wq; +struct workqueue_struct *rx_comp_wq; struct list_head sock_list; spinlock_t sock_list_lock; @@ -205,12 +206,17 @@ static int sdp_get_port(struct sock *sk, unsigned short snum) static void sdp_destroy_qp(struct sdp_sock *ssk) { struct ib_pd *pd = NULL; - struct ib_cq *cq = NULL; + struct ib_cq *rx_cq = NULL; + struct ib_cq *tx_cq = NULL; + + del_timer(&ssk->tx_ring.timer); if (ssk->qp) { pd = ssk->qp->pd; - cq = ssk->cq; - ssk->cq = NULL; + rx_cq = ssk->rx_cq; + ssk->rx_cq = NULL; + tx_cq = ssk->tx_ring.cq; + ssk->tx_ring.cq = NULL; ib_destroy_qp(ssk->qp); ssk->qp = NULL; @@ -231,8 +237,12 @@ static void sdp_destroy_qp(struct sdp_sock *ssk) } } - if (cq) - ib_destroy_cq(cq); + if (tx_cq) { + ib_destroy_cq(tx_cq); + } + + if (rx_cq) + ib_destroy_cq(rx_cq); if (ssk->mr) { ib_dereg_mr(ssk->mr); @@ -341,8 +351,11 @@ void sdp_reset_sk(struct sock *sk, int rc) read_lock(&device_removal_lock); - if (ssk->cq) - sdp_poll_cq(ssk, ssk->cq); + if (ssk->rx_cq) + sdp_poll_rx_cq(ssk); + + if (ssk->tx_ring.cq) + sdp_xmit_poll(ssk, 1); if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) sdp_set_error(sk, rc); @@ -355,7 +368,7 @@ void sdp_reset_sk(struct sock *sk, int rc) /* Don't destroy socket before destroy work does its job */ sock_hold(sk, SOCK_REF_RESET); - queue_work(comp_wq, &ssk->destroy_work); + queue_work(sdp_wq, &ssk->destroy_work); read_unlock(&device_removal_lock); } @@ -773,11 +786,10 @@ out: release_sock(sk); if (newsk) { lock_sock(newsk); - if (newssk->cq) { - sdp_dbg(newsk, "%s: ib_req_notify_cq\n", __func__); + if (newssk->rx_cq) { newssk->poll_cq = 1; - sdp_arm_cq(&newssk->isk.sk); - sdp_poll_cq(newssk, newssk->cq); + sdp_arm_rx_cq(&newssk->isk.sk); + sdp_poll_rx_cq(newssk); } release_sock(newsk); } @@ -847,7 +859,7 @@ static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo) { sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n"); - queue_delayed_work(comp_wq, &ssk->dreq_wait_work, timeo); + queue_delayed_work(sdp_wq, &ssk->dreq_wait_work, timeo); ssk->dreq_wait_timeout = 1; } @@ -1143,9 +1155,9 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname, static inline int poll_recv_cq(struct sock *sk) { int i; - if (sdp_sk(sk)->cq) { + if (sdp_sk(sk)->rx_cq) { for (i = 0; i < recv_poll; ++i) - if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) { + if (!sdp_poll_rx_cq(sdp_sk(sk))) { ++recv_poll_hit; return 0; } @@ -1157,9 +1169,9 @@ static inline int poll_recv_cq(struct sock *sk) static inline void poll_send_cq(struct sock *sk) { int i; - if (sdp_sk(sk)->cq) { + if (sdp_sk(sk)->tx_ring.cq) { for (i = 0; i < send_poll; ++i) - if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) { + if (sdp_xmit_poll(sdp_sk(sk), 1)) { ++send_poll_hit; return; } @@ -1421,12 +1433,9 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb, /* We can extend the last page * fragment. */ merge = 1; - } else if (i == ssk->send_frags || - (!i && - !(sk->sk_route_caps & NETIF_F_SG))) { + } else if (i == ssk->send_frags) { /* Need to add new fragment and cannot - * do this because interface is non-SG, - * or because all the page slots are + * do this because all the page slots are * busy. */ sdp_mark_push(ssk, skb); return SDP_NEW_SEG; @@ -1649,7 +1658,6 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk) } } - /* Like tcp_sendmsg */ /* TODO: check locking */ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, @@ -1812,7 +1820,9 @@ wait_for_sndbuf: wait_for_memory: SDPSTATS_COUNTER_INC(send_wait_for_mem); if (copied) - sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH); + sdp_push(sk, ssk, flags & ~MSG_MORE, PAGE_SIZE, TCP_NAGLE_PUSH); + + sdp_xmit_poll(ssk, 1); err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) : sk_stream_wait_memory(sk, &timeo); @@ -2386,17 +2396,25 @@ static int __init sdp_init(void) sdp_proto.sockets_allocated = sockets_allocated; sdp_proto.orphan_count = orphan_count; - comp_wq = create_singlethread_workqueue("comp_wq"); - if (!comp_wq) + rx_comp_wq = create_singlethread_workqueue("rx_comp_wq"); + if (!rx_comp_wq) goto no_mem_rx_wq; + sdp_wq = create_singlethread_workqueue("sdp_wq"); + if (!sdp_wq) + goto no_mem_sdp_wq; + rc = proto_register(&sdp_proto, 1); - if (rc) + if (rc) { + printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc); goto error_proto_reg; + } rc = sock_register(&sdp_net_proto); - if (rc) + if (rc) { + printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc); goto error_sock_reg; + } sdp_proc_init(); @@ -2409,7 +2427,9 @@ static int __init sdp_init(void) error_sock_reg: proto_unregister(&sdp_proto); error_proto_reg: - destroy_workqueue(comp_wq); + destroy_workqueue(sdp_wq); +no_mem_sdp_wq: + destroy_workqueue(rx_comp_wq); no_mem_rx_wq: kfree(orphan_count); no_mem_orphan_count: @@ -2427,7 +2447,8 @@ static void __exit sdp_exit(void) printk(KERN_WARNING "%s: orphan_count %lld\n", __func__, percpu_counter_read_positive(orphan_count)); - destroy_workqueue(comp_wq); + destroy_workqueue(rx_comp_wq); + destroy_workqueue(sdp_wq); flush_scheduled_work(); diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c index e759864e95c4c..0971a49ebc9ab 100644 --- a/drivers/infiniband/ulp/sdp/sdp_proc.c +++ b/drivers/infiniband/ulp/sdp/sdp_proc.c @@ -260,9 +260,11 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v) seq_printf(seq, "rx_poll_miss \t\t: %d\n", sdpstats.rx_poll_miss); seq_printf(seq, "tx_poll_miss \t\t: %d\n", sdpstats.tx_poll_miss); + seq_printf(seq, "tx_poll_hit \t\t: %d\n", sdpstats.tx_poll_hit); seq_printf(seq, "CQ stats:\n"); - seq_printf(seq, "- interrupts\t\t: %d\n", sdpstats.int_count); + seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count); + seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count); return 0; }