]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
Revert "RDS: IB: skip rx/tx work when destroying connection"
authorSantosh Shilimkar <santosh.shilimkar@oracle.com>
Thu, 29 Sep 2016 15:55:49 +0000 (08:55 -0700)
committerSantosh Shilimkar <santosh.shilimkar@oracle.com>
Wed, 12 Oct 2016 16:48:36 +0000 (09:48 -0700)
This reverts commit cb9e35124534e0803606ac58b03390d3f3d83dad.

Orabug: 24746103

net/rds/ib.h
net/rds/ib_cm.c

index ffb1f3d7e8ef9299229a5d8a61f90af0808e91bc..0202c1c8f4794ea3060d6db57a6fd9ff46109177 100644 (file)
@@ -246,7 +246,6 @@ struct rds_ib_connection {
        spinlock_t              i_rx_lock;
        unsigned int            i_rx_wait_for_handler;
        atomic_t                i_worker_has_rx;
-       atomic_t                i_destroying;
 };
 
 /* This assumes that atomic_t is at least 32 bits */
index 16ca391b2ba865cbb0985c93eabcf18c81f464bf..36f6a7d7f568cfec9dd0340da4ae8eff0c554fa8 100644 (file)
@@ -267,8 +267,6 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
                }
        }
 
-       atomic_set(&ic->i_destroying, 0);
-
        if (conn->c_version < RDS_PROTOCOL_VERSION) {
                if (conn->c_version != RDS_PROTOCOL_COMPAT_VERSION) {
                        printk(KERN_NOTICE "RDS/IB: Connection to"
@@ -430,7 +428,7 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
 static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
                    struct ib_wc *wcs,
                    struct rds_ib_ack_state *ack_state,
-                   unsigned int rx, int no_break)
+                   unsigned int rx)
 {
        int nr;
        int i;
@@ -460,33 +458,28 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
                                rds_ib_recv_cqe_handler(ic, wc, ack_state);
                }
 
-               if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
-                       if (no_break)
-                               ic->i_rx_poll_cq = 0;
-                       else
-                               break;
-               }
+               if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
+                       break;
 
        }
 }
 
-static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state)
+void rds_ib_tasklet_fn_send(unsigned long data)
 {
+       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
        struct rds_connection *conn = ic->conn;
        struct rds_ib_ack_state ack_state;
 
        memset(&ack_state, 0, sizeof(ack_state));
        rds_ib_stats_inc(s_ib_tasklet_call);
 
-       /* if connection is under destroying or is destroyed,
-        * ignore incoming cq event
-        */
-       if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state)
+       /* if send cq has been destroyed, ignore incoming cq event */
+       if (!ic->i_scq)
                return;
 
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
        ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP);
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
 
        if (rds_conn_up(conn) &&
           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
@@ -494,30 +487,13 @@ static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state)
                rds_send_xmit(ic->conn);
 }
 
-static void rds_ib_final_tx(struct rds_ib_connection *ic)
-{
-       if (ic->i_scq)
-               rds_ib_tx(ic, 1);
-}
-
-void rds_ib_tasklet_fn_send(unsigned long data)
-{
-       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
-
-       rds_ib_tx(ic, 0);
-}
-
 /*
  * Note: rds_ib_rx(): don't call with irqs disabled.
  * It calls rds_send_drop_acked() which calls other
  * routines that reach into rds_rdma_free_op()
  * where irqs_disabled() warning is asserted!
- *
- * skip_state is set true only when/before we are going to destroy the CQs to
- * have a final reap on the receive CQ. In this case, i_destroying is expected
- * positive to avoid other threads poll the same CQ in parallel.
  */
-static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state)
+static void rds_ib_rx(struct rds_ib_connection *ic)
 {
        struct rds_connection *conn = ic->conn;
        struct rds_ib_ack_state ack_state;
@@ -527,15 +503,12 @@ static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state)
 
        rds_ib_stats_inc(s_ib_tasklet_call);
 
-       if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state)
-               return;
-
        memset(&ack_state, 0, sizeof(ack_state));
 
        ic->i_rx_poll_cq = 0;
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state);
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
        ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state);
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
 
        if (ack_state.ack_next_valid)
                rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
@@ -553,7 +526,6 @@ static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state)
                                queue_delayed_work(rds_wq,
                                        &rds_ibdev->srq->s_refill_w, 0);
 
-       /* if skip_state is true, the following won't happen */
        if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
                ic->i_rx_w.ic = ic;
                /* Delay 10 msecs until the RX worker starts reaping again */
@@ -563,12 +535,6 @@ static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state)
        }
 }
 
-static void rds_ib_final_rx(struct rds_ib_connection *ic)
-{
-       if (ic->i_rcq)
-               rds_ib_rx(ic, 1);
-}
-
 void rds_ib_tasklet_fn_recv(unsigned long data)
 {
        struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
@@ -576,7 +542,7 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
        spin_lock_bh(&ic->i_rx_lock);
        if (ic->i_rx_wait_for_handler)
                goto out;
-       rds_ib_rx(ic, 0);
+       rds_ib_rx(ic);
 out:
        spin_unlock_bh(&ic->i_rx_lock);
 }
@@ -589,7 +555,7 @@ static void rds_ib_rx_handler(struct work_struct *_work)
 
        spin_lock_bh(&ic->i_rx_lock);
        ic->i_rx_wait_for_handler = 0;
-       rds_ib_rx(ic, 0);
+       rds_ib_rx(ic);
        spin_unlock_bh(&ic->i_rx_lock);
 }
 
@@ -1270,19 +1236,6 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
        if (ic->i_cm_id) {
                struct ib_device *dev = ic->i_cm_id->device;
 
-               /* ic->i_cm_id being non-NULL is not enough to say that the
-                * connection was fully completed before this shutdown call.
-                * i_cm_id can be created when attemping to connect. There is
-                * the possibility that other resrouces like i_rcq and i_scq is
-                * not setup yet when the shutdown come (again).
-                * we use ic->i_destroying to tell if it was fully up.
-                */
-
-               if (atomic_read(&ic->i_destroying)) {
-                       /* not fully up yet, only i_cm_id stands. */
-                       goto destroy;
-               }
-
                rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
                err = rdma_disconnect(ic->i_cm_id);
                if (err) {
@@ -1310,32 +1263,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                        /* Try to reap pending RX completions every 5 secs */
                        if (!rds_ib_ring_empty(&ic->i_recv_ring)) {
                                spin_lock_bh(&ic->i_rx_lock);
-                               rds_ib_rx(ic, 0);
+                               rds_ib_rx(ic);
                                spin_unlock_bh(&ic->i_rx_lock);
                        }
                }
 
-               atomic_inc(&ic->i_destroying);
-               smp_mb__after_atomic();
-
                tasklet_kill(&ic->i_stasklet);
                tasklet_kill(&ic->i_rtasklet);
-               flush_delayed_work(&ic->i_rx_w.work);
 
                /* first destroy the ib state that generates callbacks */
                if (ic->i_cm_id->qp)
                        rdma_destroy_qp(ic->i_cm_id);
 
-               /* Now there should be no threads (tasklet path or kworker path)
-                * really will access the CQs on seeing i_destroying positive.
-                * do the final reap on send/recv CQs. The final reaps take of
-                * flush errors of pending requests.
-                */
-               rds_ib_final_tx(ic);
-               rds_ib_final_rx(ic);
-
-destroy:
-
                if (ic->i_rcq) {
                        if (ic->rds_ibdev)
                                ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector);