From: Santosh Shilimkar Date: Thu, 29 Sep 2016 15:55:49 +0000 (-0700) Subject: Revert "RDS: IB: skip rx/tx work when destroying connection" X-Git-Tag: v4.1.12-92~67^2~18 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=01c056c7568d18e960aa9891b86a00303dfd4968;p=users%2Fjedix%2Flinux-maple.git Revert "RDS: IB: skip rx/tx work when destroying connection" This reverts commit cb9e35124534e0803606ac58b03390d3f3d83dad. Orabug: 24746103 --- diff --git a/net/rds/ib.h b/net/rds/ib.h index ffb1f3d7e8ef9..0202c1c8f4794 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -246,7 +246,6 @@ struct rds_ib_connection { spinlock_t i_rx_lock; unsigned int i_rx_wait_for_handler; atomic_t i_worker_has_rx; - atomic_t i_destroying; }; /* This assumes that atomic_t is at least 32 bits */ diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 16ca391b2ba86..36f6a7d7f568c 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -267,8 +267,6 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even } } - atomic_set(&ic->i_destroying, 0); - if (conn->c_version < RDS_PROTOCOL_VERSION) { if (conn->c_version != RDS_PROTOCOL_COMPAT_VERSION) { printk(KERN_NOTICE "RDS/IB: Connection to" @@ -430,7 +428,7 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context) static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, struct ib_wc *wcs, struct rds_ib_ack_state *ack_state, - unsigned int rx, int no_break) + unsigned int rx) { int nr; int i; @@ -460,33 +458,28 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq, rds_ib_recv_cqe_handler(ic, wc, ack_state); } - if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { - if (no_break) - ic->i_rx_poll_cq = 0; - else - break; - } + if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) + break; } } -static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state) +void rds_ib_tasklet_fn_send(unsigned long data) { + struct rds_ib_connection *ic = (struct rds_ib_connection *) data; struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; memset(&ack_state, 0, sizeof(ack_state)); rds_ib_stats_inc(s_ib_tasklet_call); - /* if connection is under destroying or is destroyed, - * ignore incoming cq event - */ - if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state) + /* if send cq has been destroyed, ignore incoming cq event */ + if (!ic->i_scq) return; - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP); - poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state); + poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0); if (rds_conn_up(conn) && (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || @@ -494,30 +487,13 @@ static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state) rds_send_xmit(ic->conn); } -static void rds_ib_final_tx(struct rds_ib_connection *ic) -{ - if (ic->i_scq) - rds_ib_tx(ic, 1); -} - -void rds_ib_tasklet_fn_send(unsigned long data) -{ - struct rds_ib_connection *ic = (struct rds_ib_connection *) data; - - rds_ib_tx(ic, 0); -} - /* * Note: rds_ib_rx(): don't call with irqs disabled. * It calls rds_send_drop_acked() which calls other * routines that reach into rds_rdma_free_op() * where irqs_disabled() warning is asserted! - * - * skip_state is set true only when/before we are going to destroy the CQs to - * have a final reap on the receive CQ. In this case, i_destroying is expected - * positive to avoid other threads poll the same CQ in parallel. */ -static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state) +static void rds_ib_rx(struct rds_ib_connection *ic) { struct rds_connection *conn = ic->conn; struct rds_ib_ack_state ack_state; @@ -527,15 +503,12 @@ static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state) rds_ib_stats_inc(s_ib_tasklet_call); - if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state) - return; - memset(&ack_state, 0, sizeof(ack_state)); ic->i_rx_poll_cq = 0; - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state); + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED); - poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state); + poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1); if (ack_state.ack_next_valid) rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required); @@ -553,7 +526,6 @@ static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state) queue_delayed_work(rds_wq, &rds_ibdev->srq->s_refill_w, 0); - /* if skip_state is true, the following won't happen */ if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) { ic->i_rx_w.ic = ic; /* Delay 10 msecs until the RX worker starts reaping again */ @@ -563,12 +535,6 @@ static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state) } } -static void rds_ib_final_rx(struct rds_ib_connection *ic) -{ - if (ic->i_rcq) - rds_ib_rx(ic, 1); -} - void rds_ib_tasklet_fn_recv(unsigned long data) { struct rds_ib_connection *ic = (struct rds_ib_connection *) data; @@ -576,7 +542,7 @@ void rds_ib_tasklet_fn_recv(unsigned long data) spin_lock_bh(&ic->i_rx_lock); if (ic->i_rx_wait_for_handler) goto out; - rds_ib_rx(ic, 0); + rds_ib_rx(ic); out: spin_unlock_bh(&ic->i_rx_lock); } @@ -589,7 +555,7 @@ static void rds_ib_rx_handler(struct work_struct *_work) spin_lock_bh(&ic->i_rx_lock); ic->i_rx_wait_for_handler = 0; - rds_ib_rx(ic, 0); + rds_ib_rx(ic); spin_unlock_bh(&ic->i_rx_lock); } @@ -1270,19 +1236,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) if (ic->i_cm_id) { struct ib_device *dev = ic->i_cm_id->device; - /* ic->i_cm_id being non-NULL is not enough to say that the - * connection was fully completed before this shutdown call. - * i_cm_id can be created when attemping to connect. There is - * the possibility that other resrouces like i_rcq and i_scq is - * not setup yet when the shutdown come (again). - * we use ic->i_destroying to tell if it was fully up. - */ - - if (atomic_read(&ic->i_destroying)) { - /* not fully up yet, only i_cm_id stands. */ - goto destroy; - } - rdsdebug("disconnecting cm %p\n", ic->i_cm_id); err = rdma_disconnect(ic->i_cm_id); if (err) { @@ -1310,32 +1263,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) /* Try to reap pending RX completions every 5 secs */ if (!rds_ib_ring_empty(&ic->i_recv_ring)) { spin_lock_bh(&ic->i_rx_lock); - rds_ib_rx(ic, 0); + rds_ib_rx(ic); spin_unlock_bh(&ic->i_rx_lock); } } - atomic_inc(&ic->i_destroying); - smp_mb__after_atomic(); - tasklet_kill(&ic->i_stasklet); tasklet_kill(&ic->i_rtasklet); - flush_delayed_work(&ic->i_rx_w.work); /* first destroy the ib state that generates callbacks */ if (ic->i_cm_id->qp) rdma_destroy_qp(ic->i_cm_id); - /* Now there should be no threads (tasklet path or kworker path) - * really will access the CQs on seeing i_destroying positive. - * do the final reap on send/recv CQs. The final reaps take of - * flush errors of pending requests. - */ - rds_ib_final_tx(ic); - rds_ib_final_rx(ic); - -destroy: - if (ic->i_rcq) { if (ic->rds_ibdev) ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector);