From: Bang Nguyen Date: Sat, 1 Mar 2014 04:56:21 +0000 (-0800) Subject: RDS: Fix slowdown when doing massively parallel workload X-Git-Tag: v4.1.12-92~293^2^2~41 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=78d0b348e65bfc4e422cbe68a8355de20347bd4f;p=users%2Fjedix%2Flinux-maple.git RDS: Fix slowdown when doing massively parallel workload In shutdown, reap the Completion Queue Entry (CQE) periodically while waiting for RX ring to quiesce Reject new send if rds-info is pending to avoid rcu stall Break RX stream into work units of 10k messages each and schedule them sequentially to avoid rcu stall Orabug: 18362838 Signed-off-by: Bang Nguyen Signed-off-by: Mukesh Kacker Acked-by: Rama Nichanamatlu Tested-by: Arvind Shukla (cherry picked from commit dac771f1e55713b8a42bdffa059e1894e1ecdf17) --- diff --git a/net/rds/connection.c b/net/rds/connection.c index 3c3283c3dd90..e5337aef1c8d 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -437,6 +437,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, list = &conn->c_retrans; spin_lock_irqsave(&conn->c_lock, flags); + conn->c_rdsinfo_pending = 1; /* XXX too lazy to maintain counts.. */ list_for_each_entry(rm, list, m_conn_item) { @@ -447,6 +448,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, conn->c_faddr, 0); } + conn->c_rdsinfo_pending = 0; spin_unlock_irqrestore(&conn->c_lock, flags); } } diff --git a/net/rds/ib.h b/net/rds/ib.h index 823ec806b48f..419b51e5ce66 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -28,6 +28,8 @@ #define RDS_IB_DEFAULT_NUM_ARPS 100 +#define RDS_IB_RX_LIMIT 10000 + #define RDS_IB_DEFAULT_TIMEOUT 16 /* 4.096 * 2 ^ 16 = 260 msec */ #define RDS_IB_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */ @@ -142,6 +144,11 @@ struct rds_ib_migrate_work { struct rds_ib_connection *ic; }; +struct rds_ib_rx_work { + struct delayed_work dlywork; + struct rds_ib_connection *ic; +}; + struct rds_ib_connection { struct list_head ib_node; @@ -227,6 +234,11 @@ struct rds_ib_connection { int i_scq_vector; int i_rcq_vector; + + spinlock_t i_rx_lock; + struct rds_ib_rx_work i_rx_w; + unsigned int i_rx_wait_for_handler; + unsigned int i_rx_poll_cq; }; /* This assumes that atomic_t is at least 32 bits */ diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 2c97d639acb0..f3892f459f54 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -323,8 +323,9 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) } static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, - struct ib_wc *wcs, - struct rds_ib_ack_state *ack_state) + struct ib_wc *wcs, + struct rds_ib_ack_state *ack_state, + unsigned int rx) { int nr; int i; @@ -332,6 +333,18 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, while ((nr = ib_poll_cq(cq, RDS_WC_MAX, wcs)) > 0) { for (i = 0; i < nr; i++) { + if (rx) { + if ((++ic->i_rx_poll_cq % RDS_IB_RX_LIMIT) + == 0) { + rdsdebug("connection " + "<%u.%u.%u.%u,%u.%u.%u.%u,%d> " + "RX poll_cq processed %d\n", + NIPQUAD(ic->conn->c_laddr), + NIPQUAD(ic->conn->c_faddr), + ic->conn->c_tos, + ic->i_rx_poll_cq); + } + } wc = wcs + i; rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", (unsigned long long)wc->wr_id, wc->status, wc->byte_len, @@ -342,6 +355,10 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, else rds_ib_recv_cqe_handler(ic, wc, ack_state); } + + if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) + break; + } } @@ -354,9 +371,9 @@ void rds_ib_tasklet_fn_send(unsigned long data) memset(&ack_state, 0, sizeof(ack_state)); rds_ib_stats_inc(s_ib_tasklet_call); - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP); - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || @@ -364,9 +381,14 @@ void rds_ib_tasklet_fn_send(unsigned long data) rds_send_xmit(ic->conn); } -void rds_ib_tasklet_fn_recv(unsigned long data) +/* + * Note: rds_ib_rx(): don't call with irqs disabled. + * It calls rds_send_drop_acked() which calls other + * routines that reach into rds_rdma_free_op() + * where irqs_disabled() warning is asserted! + */ +static void rds_ib_rx(struct rds_ib_connection *ic) { - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; struct rds_ib_device *rds_ibdev = ic->rds_ibdev; @@ -377,9 +399,10 @@ void rds_ib_tasklet_fn_recv(unsigned long data) memset(&ack_state, 0, sizeof(ack_state)); - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state); + ic->i_rx_poll_cq = 0; + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED); - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state); + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); @@ -394,7 +417,41 @@ void rds_ib_tasklet_fn_recv(unsigned long data) if ((atomic_read(&rds_ibdev->srq->s_num_posted) < rds_ib_srq_hwm_refill) && !test_and_set_bit(0, &rds_ibdev->srq->s_refill_gate)) - queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0); + queue_delayed_work(rds_wq, + &rds_ibdev->srq->s_refill_w, 0); + + if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { + ic->i_rx_w.ic = ic; + queue_delayed_work(rds_aux_wq, &ic->i_rx_w.dlywork, + msecs_to_jiffies(10)); + ic->i_rx_wait_for_handler = 1; + } +} + +void rds_ib_tasklet_fn_recv(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *) data; + + spin_lock(&ic->i_rx_lock); + if (ic->i_rx_wait_for_handler) + goto out; + rds_ib_rx(ic); +out: + spin_unlock(&ic->i_rx_lock); +} + +static void rds_ib_rx_handler(struct work_struct *workarg) +{ + struct delayed_work *delayedwork = + container_of(workarg, struct delayed_work, work); + struct rds_ib_rx_work *rirwork = + container_of(delayedwork, struct rds_ib_rx_work, dlywork); + struct rds_ib_connection *ic = rirwork->ic; + + spin_lock(&ic->i_rx_lock); + ic->i_rx_wait_for_handler = 0; + rds_ib_rx(ic); + spin_unlock(&ic->i_rx_lock); } static void rds_ib_qp_event_handler(struct ib_event *event, void *data) @@ -1069,9 +1126,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) } /* quiesce tx and rx completion before tearing down */ - wait_event(rds_ib_ring_empty_wait, - rds_ib_ring_empty(&ic->i_recv_ring) && - (atomic_read(&ic->i_signaled_sends) == 0)); + while (!wait_event_timeout(rds_ib_ring_empty_wait, + rds_ib_ring_empty(&ic->i_recv_ring) && + (atomic_read(&ic->i_signaled_sends) == 0), + msecs_to_jiffies(5000))) { + + if (!rds_ib_ring_empty(&ic->i_recv_ring)) { + spin_lock(&ic->i_rx_lock); + rds_ib_rx(ic); + spin_unlock(&ic->i_rx_lock); + } + } + tasklet_kill(&ic->i_stasklet); tasklet_kill(&ic->i_rtasklet); @@ -1207,6 +1273,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) spin_lock_init(&ic->i_ack_lock); #endif atomic_set(&ic->i_signaled_sends, 0); + spin_lock_init(&ic->i_rx_lock); /* * rds_ib_conn_shutdown() waits for these to be emptied so they @@ -1221,6 +1288,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) init_completion(&ic->i_last_wqe_complete); INIT_DELAYED_WORK(&ic->i_migrate_w.work, rds_ib_migrate); + INIT_DELAYED_WORK(&ic->i_rx_w.dlywork, rds_ib_rx_handler); spin_lock_irqsave(&ib_nodev_conns_lock, flags); list_add_tail(&ic->ib_node, &ib_nodev_conns); diff --git a/net/rds/rds.h b/net/rds/rds.h index 4c764a75a742..55ca1f7a1fb9 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -164,6 +164,8 @@ struct rds_connection { struct rds_connection *c_base_conn; unsigned int c_route_to_base; + + unsigned int c_rdsinfo_pending; }; #define RDS_FLAG_CONG_BITMAP 0x01 diff --git a/net/rds/send.c b/net/rds/send.c index 399957e369f4..4376e0a459b6 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -1308,6 +1308,11 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } + if (conn->c_rdsinfo_pending) { + ret = -EAGAIN; + goto out; + } + while (!rds_send_queue_rm(rs, conn, rm, rs->rs_bound_port, dport, &queued)) { rds_stats_inc(s_send_queue_full);