From dfbdf6c626ee246a50b3250d9b2df7baddc85119 Mon Sep 17 00:00:00 2001 From: Wengang Wang Date: Thu, 11 Aug 2016 14:16:45 -0700 Subject: [PATCH] RDS: IB: skip rx/tx work when destroying connection Orabug: 24395789 quickref: 24314773 There is a race between rds connection destruction (rds_ib_conn_shutdown) path and the IRQ path (rds_ib_cq_comp_handler_recv). The IRQ path can schedule the takelet (i_rtasklet) again (to receive data) in between of the removal of the tasklet from list and the destruction of the connection in destuction path. When the tasklet run, it would then access on stale (destroied) data. A seen case is it was accessing ic->i_rcq which is set to NULL by destuction path. Fix: We add a flag to rds_ib_connection structure indicating the connection is under detroying when set. The flag is set after we reap on the receive CQ i_rcq and before start to destroy the CQ in rds_ib_conn_shutdown(). We also flush the rds_ib_rx running in rds_aux_wq worker thread before starting the destroy. So that all existing run of rds_ib_rx (in tasklet path and workder thread path) won't access distroyed receive CQ. And newly queued job (tasklet or worker) will exist on seeing the flag set before accessing the (maybe destroied) receive CQ. The flag is unset on new connection completions to allow access on re-created receive CQ. This patch also takes care of rds_ib_cq_comp_handler_send (the IRQ handler for send). And we do a final reap after destroying the QP to take care of the flushing errors to release resouce. Signed-off-by: Wengang Wang Reviewed-by: Shamir Rabinovitch Reviewed-by: Rama Nichanamatlu Acked-by: Santosh Shilimkar --- net/rds/ib.h | 1 + net/rds/ib_cm.c | 91 +++++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 77 insertions(+), 15 deletions(-) diff --git a/net/rds/ib.h b/net/rds/ib.h index 0202c1c8f479..ffb1f3d7e8ef 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -246,6 +246,7 @@ struct rds_ib_connection { spinlock_t i_rx_lock; unsigned int i_rx_wait_for_handler; atomic_t i_worker_has_rx; + atomic_t i_destroying; }; /* This assumes that atomic_t is at least 32 bits */ diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 5a6c8a7b6832..cc99023a8c4b 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -267,6 +267,8 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even } } + atomic_set(&ic->i_destroying, 0); + if (conn->c_version < RDS_PROTOCOL_VERSION) { if (conn->c_version != RDS_PROTOCOL_COMPAT_VERSION) { printk(KERN_NOTICE "RDS/IB: Connection to" @@ -428,7 +430,7 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, struct ib_wc *wcs, struct rds_ib_ack_state *ack_state, - unsigned int rx) + unsigned int rx, int no_break) { int nr; int i; @@ -458,28 +460,33 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, rds_ib_recv_cqe_handler(ic, wc, ack_state); } - if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) - break; + if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { + if (no_break) + ic->i_rx_poll_cq = 0; + else + break; + } } } -void rds_ib_tasklet_fn_send(unsigned long data) +static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state) { - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; memset(&ack_state, 0, sizeof(ack_state)); rds_ib_stats_inc(s_ib_tasklet_call); - /* if send cq has been destroyed, ignore incoming cq event */ - if (!ic->i_scq) + /* if connection is under destroying or is destroyed, + * ignore incoming cq event + */ + if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state) return; - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state); ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP); - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state); if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || @@ -487,13 +494,30 @@ void rds_ib_tasklet_fn_send(unsigned long data) rds_send_xmit(ic->conn); } +static void rds_ib_final_tx(struct rds_ib_connection *ic) +{ + if (ic->i_scq) + rds_ib_tx(ic, 1); +} + +void rds_ib_tasklet_fn_send(unsigned long data) +{ + struct rds_ib_connection *ic = (struct rds_ib_connection *) data; + + rds_ib_tx(ic, 0); +} + /* * Note: rds_ib_rx(): don't call with irqs disabled. * It calls rds_send_drop_acked() which calls other * routines that reach into rds_rdma_free_op() * where irqs_disabled() warning is asserted! + * + * skip_state is set true only when/before we are going to destroy the CQs to + * have a final reap on the receive CQ. In this case, i_destroying is expected + * positive to avoid other threads poll the same CQ in parallel. */ -static void rds_ib_rx(struct rds_ib_connection *ic) +static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state) { struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; @@ -503,12 +527,15 @@ static void rds_ib_rx(struct rds_ib_connection *ic) rds_ib_stats_inc(s_ib_tasklet_call); + if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state) + return; + memset(&ack_state, 0, sizeof(ack_state)); ic->i_rx_poll_cq = 0; - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state); ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED); - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state); if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); @@ -526,6 +553,7 @@ static void rds_ib_rx(struct rds_ib_connection *ic) queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0); + /* if skip_state is true, the following won't happen */ if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { ic->i_rx_w.ic = ic; /* Delay 10 msecs until the RX worker starts reaping again */ @@ -535,6 +563,12 @@ static void rds_ib_rx(struct rds_ib_connection *ic) } } +static void rds_ib_final_rx(struct rds_ib_connection *ic) +{ + if (ic->i_rcq) + rds_ib_rx(ic, 1); +} + void rds_ib_tasklet_fn_recv(unsigned long data) { struct rds_ib_connection *ic = (struct rds_ib_connection *) data; @@ -542,7 +576,7 @@ void rds_ib_tasklet_fn_recv(unsigned long data) spin_lock_bh(&ic->i_rx_lock); if (ic->i_rx_wait_for_handler) goto out; - rds_ib_rx(ic); + rds_ib_rx(ic, 0); out: spin_unlock_bh(&ic->i_rx_lock); } @@ -555,7 +589,7 @@ static void rds_ib_rx_handler(struct work_struct *_work) spin_lock_bh(&ic->i_rx_lock); ic->i_rx_wait_for_handler = 0; - rds_ib_rx(ic); + rds_ib_rx(ic, 0); spin_unlock_bh(&ic->i_rx_lock); } @@ -1236,6 +1270,19 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) if (ic->i_cm_id) { struct ib_device *dev = ic->i_cm_id->device; + /* ic->i_cm_id being non-NULL is not enough to say that the + * connection was fully completed before this shutdown call. + * i_cm_id can be created when attemping to connect. There is + * the possibility that other resrouces like i_rcq and i_scq is + * not setup yet when the shutdown come (again). + * we use ic->i_destroying to tell if it was fully up. + */ + + if (atomic_read(&ic->i_destroying)) { + /* not fully up yet, only i_cm_id stands. */ + goto destroy; + } + rdsdebug("disconnecting cm %p\n", ic->i_cm_id); err = rdma_disconnect(ic->i_cm_id); if (err) { @@ -1263,18 +1310,32 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) /* Try to reap pending RX completions every 5 secs */ if (!rds_ib_ring_empty(&ic->i_recv_ring)) { spin_lock_bh(&ic->i_rx_lock); - rds_ib_rx(ic); + rds_ib_rx(ic, 0); spin_unlock_bh(&ic->i_rx_lock); } } + atomic_inc(&ic->i_destroying); + smp_mb__after_atomic_inc(); + tasklet_kill(&ic->i_stasklet); tasklet_kill(&ic->i_rtasklet); + flush_delayed_work(&ic->i_rx_w.work); /* first destroy the ib state that generates callbacks */ if (ic->i_cm_id->qp) rdma_destroy_qp(ic->i_cm_id); + /* Now there should be no threads (tasklet path or kworker path) + * really will access the CQs on seeing i_destroying positive. + * do the final reap on send/recv CQs. The final reaps take of + * flush errors of pending requests. + */ + rds_ib_final_tx(ic); + rds_ib_final_rx(ic); + +destroy: + if (ic->i_rcq) { if (ic->rds_ibdev) ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector); -- 2.49.0