]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS: IB: skip rx/tx work when destroying connection
authorWengang Wang <wen.gang.wang@oracle.com>
Thu, 11 Aug 2016 21:16:45 +0000 (14:16 -0700)
committerSantosh Shilimkar <santosh.shilimkar@oracle.com>
Thu, 11 Aug 2016 22:11:13 +0000 (15:11 -0700)
Orabug: 24395789
quickref: 24314773

There is a race between rds connection destruction (rds_ib_conn_shutdown) path
and the IRQ path (rds_ib_cq_comp_handler_recv). The IRQ path can schedule the
takelet (i_rtasklet) again (to receive data) in between of the removal of the
tasklet from list and the destruction of the connection in destuction path. When
the tasklet run, it would then access on stale (destroied) data.
A seen case is it was accessing ic->i_rcq which is set to NULL by destuction
path.

Fix:
We add a flag to rds_ib_connection structure indicating the connection is
under detroying when set. The flag is set after we reap on the receive CQ i_rcq
and before start to destroy the CQ in rds_ib_conn_shutdown(). We also flush the
rds_ib_rx running in rds_aux_wq worker thread before starting the destroy. So
that all existing run of rds_ib_rx (in tasklet path and workder thread path)
won't access distroyed receive CQ. And newly queued job (tasklet or worker) will
exist on seeing the flag set before accessing the (maybe destroied) receive CQ.
The flag is unset on new connection completions to allow access on re-created
receive CQ. This patch also takes care of rds_ib_cq_comp_handler_send (the IRQ
handler for send). And we do a final reap after destroying the QP to take care
of the flushing errors to release resouce.

Signed-off-by: Wengang Wang <wen.gang.wang@oracle.com>
Reviewed-by: Shamir Rabinovitch <shamir.rabinovitch@oracle.com>
Reviewed-by: Rama Nichanamatlu <rama.nichanamatlu@oracle.com>
Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
net/rds/ib.h
net/rds/ib_cm.c

index 0202c1c8f4794ea3060d6db57a6fd9ff46109177..ffb1f3d7e8ef9299229a5d8a61f90af0808e91bc 100644 (file)
@@ -246,6 +246,7 @@ struct rds_ib_connection {
        spinlock_t              i_rx_lock;
        unsigned int            i_rx_wait_for_handler;
        atomic_t                i_worker_has_rx;
+       atomic_t                i_destroying;
 };
 
 /* This assumes that atomic_t is at least 32 bits */
index 5a6c8a7b68321d2911bd5173d2e95882cfb2bcc1..cc99023a8c4b9c5c06aa23d2b99b8ef3cd3a4c81 100644 (file)
@@ -267,6 +267,8 @@ void rds_ib_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_even
                }
        }
 
+       atomic_set(&ic->i_destroying, 0);
+
        if (conn->c_version < RDS_PROTOCOL_VERSION) {
                if (conn->c_version != RDS_PROTOCOL_COMPAT_VERSION) {
                        printk(KERN_NOTICE "RDS/IB: Connection to"
@@ -428,7 +430,7 @@ static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
 static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
                    struct ib_wc *wcs,
                    struct rds_ib_ack_state *ack_state,
-                   unsigned int rx)
+                   unsigned int rx, int no_break)
 {
        int nr;
        int i;
@@ -458,28 +460,33 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
                                rds_ib_recv_cqe_handler(ic, wc, ack_state);
                }
 
-               if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT)
-                       break;
+               if (rx && ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
+                       if (no_break)
+                               ic->i_rx_poll_cq = 0;
+                       else
+                               break;
+               }
 
        }
 }
 
-void rds_ib_tasklet_fn_send(unsigned long data)
+static void rds_ib_tx(struct rds_ib_connection *ic, int skip_state)
 {
-       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
        struct rds_connection *conn = ic->conn;
        struct rds_ib_ack_state ack_state;
 
        memset(&ack_state, 0, sizeof(ack_state));
        rds_ib_stats_inc(s_ib_tasklet_call);
 
-       /* if send cq has been destroyed, ignore incoming cq event */
-       if (!ic->i_scq)
+       /* if connection is under destroying or is destroyed,
+        * ignore incoming cq event
+        */
+       if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state)
                return;
 
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state);
        ib_req_notify_cq(ic->i_scq, IB_CQ_NEXT_COMP);
-       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0);
+       poll_cq(ic, ic->i_scq, ic->i_send_wc, &ack_state, 0, skip_state);
 
        if (rds_conn_up(conn) &&
           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
@@ -487,13 +494,30 @@ void rds_ib_tasklet_fn_send(unsigned long data)
                rds_send_xmit(ic->conn);
 }
 
+static void rds_ib_final_tx(struct rds_ib_connection *ic)
+{
+       if (ic->i_scq)
+               rds_ib_tx(ic, 1);
+}
+
+void rds_ib_tasklet_fn_send(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
+
+       rds_ib_tx(ic, 0);
+}
+
 /*
  * Note: rds_ib_rx(): don't call with irqs disabled.
  * It calls rds_send_drop_acked() which calls other
  * routines that reach into rds_rdma_free_op()
  * where irqs_disabled() warning is asserted!
+ *
+ * skip_state is set true only when/before we are going to destroy the CQs to
+ * have a final reap on the receive CQ. In this case, i_destroying is expected
+ * positive to avoid other threads poll the same CQ in parallel.
  */
-static void rds_ib_rx(struct rds_ib_connection *ic)
+static void rds_ib_rx(struct rds_ib_connection *ic, int skip_state)
 {
        struct rds_connection *conn = ic->conn;
        struct rds_ib_ack_state ack_state;
@@ -503,12 +527,15 @@ static void rds_ib_rx(struct rds_ib_connection *ic)
 
        rds_ib_stats_inc(s_ib_tasklet_call);
 
+       if (unlikely(atomic_read(&ic->i_destroying)) && !skip_state)
+               return;
+
        memset(&ack_state, 0, sizeof(ack_state));
 
        ic->i_rx_poll_cq = 0;
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state);
        ib_req_notify_cq(ic->i_rcq, IB_CQ_SOLICITED);
-       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1);
+       poll_cq(ic, ic->i_rcq, ic->i_recv_wc, &ack_state, 1, skip_state);
 
        if (ack_state.ack_next_valid)
                rds_ib_set_ack(ic, ack_state.ack_next, ack_state.ack_required);
@@ -526,6 +553,7 @@ static void rds_ib_rx(struct rds_ib_connection *ic)
                                queue_delayed_work(rds_wq,
                                        &rds_ibdev->srq->s_refill_w, 0);
 
+       /* if skip_state is true, the following won't happen */
        if (ic->i_rx_poll_cq >= RDS_IB_RX_LIMIT) {
                ic->i_rx_w.ic = ic;
                /* Delay 10 msecs until the RX worker starts reaping again */
@@ -535,6 +563,12 @@ static void rds_ib_rx(struct rds_ib_connection *ic)
        }
 }
 
+static void rds_ib_final_rx(struct rds_ib_connection *ic)
+{
+       if (ic->i_rcq)
+               rds_ib_rx(ic, 1);
+}
+
 void rds_ib_tasklet_fn_recv(unsigned long data)
 {
        struct rds_ib_connection *ic = (struct rds_ib_connection *) data;
@@ -542,7 +576,7 @@ void rds_ib_tasklet_fn_recv(unsigned long data)
        spin_lock_bh(&ic->i_rx_lock);
        if (ic->i_rx_wait_for_handler)
                goto out;
-       rds_ib_rx(ic);
+       rds_ib_rx(ic, 0);
 out:
        spin_unlock_bh(&ic->i_rx_lock);
 }
@@ -555,7 +589,7 @@ static void rds_ib_rx_handler(struct work_struct *_work)
 
        spin_lock_bh(&ic->i_rx_lock);
        ic->i_rx_wait_for_handler = 0;
-       rds_ib_rx(ic);
+       rds_ib_rx(ic, 0);
        spin_unlock_bh(&ic->i_rx_lock);
 }
 
@@ -1236,6 +1270,19 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
        if (ic->i_cm_id) {
                struct ib_device *dev = ic->i_cm_id->device;
 
+               /* ic->i_cm_id being non-NULL is not enough to say that the
+                * connection was fully completed before this shutdown call.
+                * i_cm_id can be created when attemping to connect. There is
+                * the possibility that other resrouces like i_rcq and i_scq is
+                * not setup yet when the shutdown come (again).
+                * we use ic->i_destroying to tell if it was fully up.
+                */
+
+               if (atomic_read(&ic->i_destroying)) {
+                       /* not fully up yet, only i_cm_id stands. */
+                       goto destroy;
+               }
+
                rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
                err = rdma_disconnect(ic->i_cm_id);
                if (err) {
@@ -1263,18 +1310,32 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                        /* Try to reap pending RX completions every 5 secs */
                        if (!rds_ib_ring_empty(&ic->i_recv_ring)) {
                                spin_lock_bh(&ic->i_rx_lock);
-                               rds_ib_rx(ic);
+                               rds_ib_rx(ic, 0);
                                spin_unlock_bh(&ic->i_rx_lock);
                        }
                }
 
+               atomic_inc(&ic->i_destroying);
+               smp_mb__after_atomic_inc();
+
                tasklet_kill(&ic->i_stasklet);
                tasklet_kill(&ic->i_rtasklet);
+               flush_delayed_work(&ic->i_rx_w.work);
 
                /* first destroy the ib state that generates callbacks */
                if (ic->i_cm_id->qp)
                        rdma_destroy_qp(ic->i_cm_id);
 
+               /* Now there should be no threads (tasklet path or kworker path)
+                * really will access the CQs on seeing i_destroying positive.
+                * do the final reap on send/recv CQs. The final reaps take of
+                * flush errors of pending requests.
+                */
+               rds_ib_final_tx(ic);
+               rds_ib_final_rx(ic);
+
+destroy:
+
                if (ic->i_rcq) {
                        if (ic->rds_ibdev)
                                ibdev_put_vector(ic->rds_ibdev, ic->i_rcq_vector);