]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS: don't test ring_empty or ring_low without locks held
authorChris Mason <chris.mason@oracle.com>
Fri, 3 Feb 2012 16:09:36 +0000 (11:09 -0500)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 7 Jul 2015 23:41:33 +0000 (16:41 -0700)
The math in the ring functions can't be trusted unless you're either the only
person adding to the ring or the only person freeing from it.  If there are no
locks held at all you can end up hitting bogus assertions around the ring counters.

This chnages the rds_ib_recv_refill code and the recv tasklet code to make sure
proper locks are held before we use rds_ib_ring_empty or rds_ib_ring_low

Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
net/rds/ib_recv.c

index d0ab1222634ffc12d83dd0458f13ae0dd5f2d366..a15da0ddf4dc944d57d240d384230497763d7e5c 100644 (file)
@@ -382,6 +382,8 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
        int ret = 0;
        int can_wait = gfp & __GFP_WAIT;
        int must_wake = 0;      
+       int ring_low = 0;
+       int ring_empty = 0;
        u32 pos;
 
        /*
@@ -392,6 +394,22 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
        if (!acquire_refill(conn))
                return;
 
+       ring_low = rds_ib_ring_low(&ic->i_recv_ring);
+       ring_empty = rds_ib_ring_empty(&ic->i_recv_ring);
+
+       /* If we ever end up with a really empty receive ring, we're
+        * in deep trouble, as the sender will definitely see RNR
+        * timeouts. */
+       if (ring_empty)
+               rds_ib_stats_inc(s_ib_rx_ring_empty);
+
+       /*
+        * if we're called from the tasklet, can_wait will be zero.  We only
+        * want to refill if we're getting low in this case
+        */
+       if (!ring_low && !can_wait)
+               goto release_out;
+
        while ((prefill || rds_conn_up(conn))
                        && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
                if (pos >= ic->i_recv_ring.w_nr) {
@@ -427,6 +445,10 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
                }
        }
 
+       /* read ring_low and ring_empty before we drop our lock */
+       ring_low = rds_ib_ring_low(&ic->i_recv_ring);
+       ring_empty = rds_ib_ring_empty(&ic->i_recv_ring);
+
        /* We're doing flow control - update the window. */
        if (ic->i_flowctl && posted)
                rds_ib_advertise_credits(conn, posted);
@@ -434,6 +456,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
        if (ret)
                rds_ib_ring_unalloc(&ic->i_recv_ring, 1);
 
+release_out:
        release_refill(conn);
 
        /* if we're called from the softirq handler, we'll be GFP_NOWAIT.
@@ -447,9 +470,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
         * if we should requeue.
         */
        if (rds_conn_up(conn) &&
-          (must_wake ||
-          (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
-          rds_ib_ring_empty(&ic->i_recv_ring))) {
+          (must_wake || (can_wait && ring_low) || ring_empty)) {
                queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
        }
        if (can_wait)
@@ -1020,14 +1041,7 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
        }
        rds_ib_ring_free(&ic->i_recv_ring, 1);
 
-       /* If we ever end up with a really empty receive ring, we're
-        * in deep trouble, as the sender will definitely see RNR
-        * timeouts. */
-       if (rds_ib_ring_empty(&ic->i_recv_ring))
-               rds_ib_stats_inc(s_ib_rx_ring_empty);
-
-       if (rds_ib_ring_low(&ic->i_recv_ring))
-               rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
+       rds_ib_recv_refill(conn, 0, GFP_NOWAIT);
 }
 
 int rds_ib_recv(struct rds_connection *conn)