From: Bang Nguyen Date: Tue, 15 Apr 2014 17:37:04 +0000 (-0700) Subject: RDS: Replace queue_work() by cond_resched() in the tasklet to breakup RX stream X-Git-Tag: v4.1.12-92~293^2^2~35 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=a83382cb46ff4da3f1222f3374d65b70b0f3b335;p=users%2Fjedix%2Flinux-maple.git RDS: Replace queue_work() by cond_resched() in the tasklet to breakup RX stream Orabug: 18801931 Signed-off-by: Bang Nguyen Signed-off-by: Ajaykumar Hotchandani (cherry picked from commit 74723dd3283d1bf2b352e5f71fe27340283716ed) Signed-off-by: Jerry Snitselaar (cherry picked from commit 37f5f84933725ad41644c0507b822a2d6b4e6cf0) --- diff --git a/net/rds/ib.h b/net/rds/ib.h index eda984eeaf1d..fb0552c5ba8d 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -235,9 +235,6 @@ struct rds_ib_connection { int i_scq_vector; int i_rcq_vector; - spinlock_t i_rx_lock; - struct rds_ib_rx_work i_rx_w; - unsigned int i_rx_wait_for_handler; unsigned int i_rx_poll_cq; }; diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 7707306bf1fe..0ed5d7f4e26b 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -333,18 +333,11 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) { for (i = 0; i < nr; i++) { - if (rx) { - if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) - == 0) { - rdsdebug("connection " - "<%u.%u.%u.%u,%u.%u.%u.%u,%d> " - "RX poll_cq processed %d\n", - NIPQUAD(ic->conn->c_laddr), - NIPQUAD(ic->conn->c_faddr), - ic->conn->c_tos, - ic->i_rx_poll_cq); - } - } + + if (rx && + (++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) == 0) + cond_resched(); + wc = wcs + i; rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", (unsigned long long)wc->wr_id, wc->status, wc->byte_len, @@ -355,10 +348,6 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, else rds_ib_recv_cqe_handler(ic, wc, ack_state); } - - if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) - break; - } } @@ -385,14 +374,9 @@ void rds_ib_tasklet_fn_send(unsigned long data) rds_send_xmit(ic->conn); } -/* - * Note: rds_ib_rx(): don't call with irqs disabled. - * It calls rds_send_drop_acked() which calls other - * routines that reach into rds_rdma_free_op() - * where irqs_disabled() warning is asserted! - */ -static void rds_ib_rx(struct rds_ib_connection *ic) +void rds_ib_tasklet_fn_recv(unsigned long data) { + struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; struct rds_ib_device *rds_ibdev = ic->rds_ibdev; @@ -410,53 +394,22 @@ static void rds_ib_rx(struct rds_ib_connection *ic) if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); + if (ack_state.ack_recv_valid && ack_state.ack_recv > ic->i_ack_recv) { rds_send_drop_acked(conn, ack_state.ack_recv, NULL); ic->i_ack_recv = ack_state.ack_recv; } + if (rds_conn_up(conn)) rds_ib_attempt_ack(ic); if (rds_ib_srq_enabled) if ((atomic_read(&rds_ibdev->srq->s_num_posted) < - rds_ib_srq_hwm_refill) && - !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) - queue_delayed_work(rds_wq, - &rds_ibdev->srq->s_refill_w, 0); - - if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { - ic->i_rx_w.ic = ic; - queue_delayed_work(rds_aux_wq, &ic->i_rx_w.dlywork, - msecs_to_jiffies(10)); - ic->i_rx_wait_for_handler = 1; - } + rds_ib_srq_hwm_refill) && + !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) + queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0); } -void rds_ib_tasklet_fn_recv(unsigned long data) -{ - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; - - spin_lock(&ic->i_rx_lock); - if (ic->i_rx_wait_for_handler) - goto out; - rds_ib_rx(ic); -out: - spin_unlock(&ic->i_rx_lock); -} - -static void rds_ib_rx_handler(struct work_struct *workarg) -{ - struct delayed_work *delayedwork = - container_of(workarg, struct delayed_work, work); - struct rds_ib_rx_work *rirwork = - container_of(delayedwork, struct rds_ib_rx_work, dlywork); - struct rds_ib_connection *ic = rirwork->ic; - - spin_lock(&ic->i_rx_lock); - ic->i_rx_wait_for_handler = 0; - rds_ib_rx(ic); - spin_unlock(&ic->i_rx_lock); -} static void rds_ib_qp_event_handler(struct ib_event *event, void *data) { @@ -1130,17 +1083,9 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) } /* quiesce tx and rx completion before tearing down */ - while (!wait_event_timeout(rds_ib_ring_empty_wait, - rds_ib_ring_empty(&ic->i_recv_ring) && - (atomic_read(&ic->i_signaled_sends) == 0), - msecs_to_jiffies(5000))) { - - if (!rds_ib_ring_empty(&ic->i_recv_ring)) { - spin_lock(&ic->i_rx_lock); - rds_ib_rx(ic); - spin_unlock(&ic->i_rx_lock); - } - } + wait_event(rds_ib_ring_empty_wait, + rds_ib_ring_empty(&ic->i_recv_ring) && + (atomic_read(&ic->i_signaled_sends) == 0)); tasklet_kill(&ic->i_stasklet); tasklet_kill(&ic->i_rtasklet); @@ -1277,7 +1222,6 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) spin_lock_init(&ic->i_ack_lock); #endif atomic_set(&ic->i_signaled_sends, 0); - spin_lock_init(&ic->i_rx_lock); /* * rds_ib_conn_shutdown() waits for these to be emptied so they @@ -1292,7 +1236,6 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) init_completion(&ic->i_last_wqe_complete); INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate); - INIT_DELAYED_WORK(&ic->i_rx_w.dlywork, rds_ib_rx_handler); spin_lock_irqsave(&ib_nodev_conns_lock, flags); list_add_tail(&ic->ib_node, &ib_nodev_conns);