From: Chris Mason Date: Fri, 3 Feb 2012 16:09:36 +0000 (-0500) Subject: RDS: don't test ring_empty or ring_low without locks held X-Git-Tag: v4.1.12-92~319^2^2~2^2~32 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=00956afeff06fe8961d7359774cda006ca54ad94;p=users%2Fjedix%2Flinux-maple.git RDS: don't test ring_empty or ring_low without locks held The math in the ring functions can't be trusted unless you're either the only person adding to the ring or the only person freeing from it. If there are no locks held at all you can end up hitting bogus assertions around the ring counters. This chnages the rds_ib_recv_refill code and the recv tasklet code to make sure proper locks are held before we use rds_ib_ring_empty or rds_ib_ring_low Signed-off-by: Chris Mason Signed-off-by: Bang Nguyen --- diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c index d0ab1222634f..a15da0ddf4dc 100644 --- a/net/rds/ib_recv.c +++ b/net/rds/ib_recv.c @@ -382,6 +382,8 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) int ret = 0; int can_wait = gfp & __GFP_WAIT; int must_wake = 0; + int ring_low = 0; + int ring_empty = 0; u32 pos; /* @@ -392,6 +394,22 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) if (!acquire_refill(conn)) return; + ring_low = rds_ib_ring_low(&ic->i_recv_ring); + ring_empty = rds_ib_ring_empty(&ic->i_recv_ring); + + /* If we ever end up with a really empty receive ring, we're + * in deep trouble, as the sender will definitely see RNR + * timeouts. */ + if (ring_empty) + rds_ib_stats_inc(s_ib_rx_ring_empty); + + /* + * if we're called from the tasklet, can_wait will be zero. We only + * want to refill if we're getting low in this case + */ + if (!ring_low && !can_wait) + goto release_out; + while ((prefill || rds_conn_up(conn)) && rds_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) { if (pos >= ic->i_recv_ring.w_nr) { @@ -427,6 +445,10 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) } } + /* read ring_low and ring_empty before we drop our lock */ + ring_low = rds_ib_ring_low(&ic->i_recv_ring); + ring_empty = rds_ib_ring_empty(&ic->i_recv_ring); + /* We're doing flow control - update the window. */ if (ic->i_flowctl && posted) rds_ib_advertise_credits(conn, posted); @@ -434,6 +456,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) if (ret) rds_ib_ring_unalloc(&ic->i_recv_ring, 1); +release_out: release_refill(conn); /* if we're called from the softirq handler, we'll be GFP_NOWAIT. @@ -447,9 +470,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp) * if we should requeue. */ if (rds_conn_up(conn) && - (must_wake || - (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) || - rds_ib_ring_empty(&ic->i_recv_ring))) { + (must_wake || (can_wait && ring_low) || ring_empty)) { queue_delayed_work(rds_wq, &conn->c_recv_w, 1); } if (can_wait) @@ -1020,14 +1041,7 @@ void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, } rds_ib_ring_free(&ic->i_recv_ring, 1); - /* If we ever end up with a really empty receive ring, we're - * in deep trouble, as the sender will definitely see RNR - * timeouts. */ - if (rds_ib_ring_empty(&ic->i_recv_ring)) - rds_ib_stats_inc(s_ib_rx_ring_empty); - - if (rds_ib_ring_low(&ic->i_recv_ring)) - rds_ib_recv_refill(conn, 0, GFP_NOWAIT); + rds_ib_recv_refill(conn, 0, GFP_NOWAIT); } int rds_ib_recv(struct rds_connection *conn)