From d3e9f58a784552a53a7727450481964f924c5fee Mon Sep 17 00:00:00 2001 From: Bang Nguyen Date: Wed, 7 May 2014 14:48:51 -0700 Subject: [PATCH] RDS: Remove cond_resched() in RX tasklet Re-install the base fix 17829338 and replace spin_lock_irqsave(rx_lock)/spin_unlock_ireqrestore(rx_lock) with spin_lock_bh(rx_lock)/spin_unlock_bh(rx_lock) to resolve bugs 18413711 and 18461816. rx_lock is used to prevent concurrent reaping b/w the RX tasklet and worker. Orabug: 18801937 Signed-off-by: Bang Nguyen Signed-off-by: Chien-Hua Yen Tested-by: Arvind Shukla (cherry picked from commit 409138bae9be49ee9782eed244a20774d61d6208) Signed-off-by: Jerry Snitselaar (cherry picked from commit cb2cb09bc520f2915a7c9c2eb1072d936a7b64b6) --- net/rds/ib.h | 6 +++- net/rds/ib_cm.c | 86 ++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 76 insertions(+), 16 deletions(-) diff --git a/net/rds/ib.h b/net/rds/ib.h index fb0552c5ba8d5..d8c5996282ba1 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -145,7 +145,7 @@ struct rds_ib_migrate_work { }; struct rds_ib_rx_work { - struct delayed_work dlywork; + struct delayed_work work; struct rds_ib_connection *ic; }; @@ -236,6 +236,10 @@ struct rds_ib_connection { int i_rcq_vector; unsigned int i_rx_poll_cq; + struct rds_ib_rx_work i_rx_w; + spinlock_t i_rx_lock; + unsigned int i_rx_wait_for_handler; + atomic_t i_worker_has_rx; }; /* This assumes that atomic_t is at least 32 bits */ diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 0ed5d7f4e26bf..b125997d08edc 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -333,11 +333,17 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) { for (i = 0; i < nr; i++) { - - if (rx && - (++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) - cond_resched(); - + if (rx) { + if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) { + rdsdebug("connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> " + "RX poll_cq processed %d\n", + NIPQUAD(ic->conn->c_laddr), + NIPQUAD(ic->conn->c_faddr), + ic->conn->c_tos, + ic->i_rx_poll_cq); + } + } wc = wcs + i; rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", (unsigned long long)wc->wr_id, wc->status, wc->byte_len, @@ -348,6 +354,10 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, else rds_ib_recv_cqe_handler(ic, wc, ack_state); } + + if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) + break; + } } @@ -374,9 +384,14 @@ void rds_ib_tasklet_fn_send(unsigned long data) rds_send_xmit(ic->conn); } -void rds_ib_tasklet_fn_recv(unsigned long data) +/* + * Note: rds_ib_rx(): don't call with irqs disabled. + * It calls rds_send_drop_acked() which calls other + * routines that reach into rds_rdma_free_op() + * where irqs_disabled() warning is asserted! + */ +static void rds_ib_rx(struct rds_ib_connection *ic) { - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; struct rds_ib_device *rds_ibdev = ic->rds_ibdev; @@ -394,22 +409,52 @@ void rds_ib_tasklet_fn_recv(unsigned long data) if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); - if (ack_state.ack_recv_valid && ack_state.ack_recv > ic->i_ack_recv) { rds_send_drop_acked(conn, ack_state.ack_recv, NULL); ic->i_ack_recv = ack_state.ack_recv; } - if (rds_conn_up(conn)) rds_ib_attempt_ack(ic); if (rds_ib_srq_enabled) if ((atomic_read(&rds_ibdev->srq->s_num_posted) < - rds_ib_srq_hwm_refill) && - !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) - queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0); + rds_ib_srq_hwm_refill) && + !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) + queue_delayed_work(rds_wq, + &rds_ibdev->srq->s_refill_w, 0); + + if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { + ic->i_rx_w.ic = ic; + /* Delay 10 msecs until the RX worker starts reaping again */ + queue_delayed_work(rds_aux_wq, &ic->i_rx_w, + msecs_to_jiffies(10)); + ic->i_rx_wait_for_handler = 1; + } +} + +void rds_ib_tasklet_fn_recv(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *) data; + + spin_lock_bh(&ic->i_rx_lock); + if (ic->i_rx_wait_for_handler) + goto out; + rds_ib_rx(ic); +out: + spin_unlock_bh(&ic->i_rx_lock); } +static void rds_ib_rx_handler(struct work_struct *_work) +{ + struct rds_ib_rx_work *work = + container_of(_work, struct rds_ib_rx_work, work.work); + struct rds_ib_connection *ic = work->ic; + + spin_lock_bh(&ic->i_rx_lock); + ic->i_rx_wait_for_handler = 0; + rds_ib_rx(ic); + spin_unlock_bh(&ic->i_rx_lock); +} static void rds_ib_qp_event_handler(struct ib_event *event, void *data) { @@ -1083,9 +1128,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) } /* quiesce tx and rx completion before tearing down */ - wait_event(rds_ib_ring_empty_wait, - rds_ib_ring_empty(&ic->i_recv_ring) && - (atomic_read(&ic->i_signaled_sends) == 0)); + while (!wait_event_timeout(rds_ib_ring_empty_wait, + rds_ib_ring_empty(&ic->i_recv_ring) && + (atomic_read(&ic->i_signaled_sends) == 0), + msecs_to_jiffies(5000))) { + + /* Try to reap pending RX completions every 5 secs */ + if (!rds_ib_ring_empty(&ic->i_recv_ring)) { + spin_lock_bh(&ic->i_rx_lock); + rds_ib_rx(ic); + spin_unlock_bh(&ic->i_rx_lock); + } + } tasklet_kill(&ic->i_stasklet); tasklet_kill(&ic->i_rtasklet); @@ -1222,6 +1276,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) spin_lock_init(&ic->i_ack_lock); #endif atomic_set(&ic->i_signaled_sends, 0); + spin_lock_init(&ic->i_rx_lock); /* * rds_ib_conn_shutdown() waits for these to be emptied so they @@ -1236,6 +1291,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) init_completion(&ic->i_last_wqe_complete); INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate); + INIT_DELAYED_WORK(&ic->i_rx_w.work, rds_ib_rx_handler); spin_lock_irqsave(&ib_nodev_conns_lock, flags); list_add_tail(&ic->ib_node, &ib_nodev_conns); -- 2.50.1