]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
RDS/IB: track signaled sends
authorZach Brown <zach.brown@oracle.com>
Wed, 14 Jul 2010 02:23:32 +0000 (19:23 -0700)
committerMukesh Kacker <mukesh.kacker@oracle.com>
Tue, 7 Jul 2015 23:41:24 +0000 (16:41 -0700)
RDS/IB: track signaled sends

We're seeing bugs today where IB connection shutdown clears the send
ring while the tasklet is processing completed sends.  Implementation
details cause this to dereference a null pointer.  Shutdown needs to
wait for send completion to stop before tearing down the connection.  We
can't simply wait for the ring to empty because it may contain
unsignaled sends that will never be processed.

This patch tracks the number of signaled sends that we've posted and
waits for them to complete.  It also makes sure that the tasklet has
finished executing.

Signed-off-by: Zach Brown <zach.brown@oracle.com>
Signed-off-by: Chris Mason <chris.mason@oracle.com>
Signed-off-by: Bang Nguyen <bang.nguyen@oracle.com>
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_send.c

index 92119da9fcf6cb71d8d49e87955fe819c52a54da..5cf8b6188e5cefe807b0e1d7c9af32254027ba05 100644 (file)
@@ -124,6 +124,7 @@ struct rds_ib_connection {
        struct rds_header       *i_send_hdrs;
        u64                     i_send_hdrs_dma;
        struct rds_ib_send_work *i_sends;
+       atomic_t                i_signaled_sends;
 
        /* rx */
        struct tasklet_struct   i_recv_tasklet;
index 6325f124be5f6236823161ec36cf457fc67fe981..4e30ee595f30207ccaa8d7a9c38db742bdcc5db0 100644 (file)
@@ -644,12 +644,11 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                                ic->i_cm_id, err);
                }
 
-               /*
-                * Don't wait for the send ring to be empty -- there may be completed
-                * non-signaled entries sitting on there. We unmap these below.
-                */
+               /* quiesce tx and rx completion before tearing down */
                wait_event(rds_ib_ring_empty_wait,
-                       rds_ib_ring_empty(&ic->i_recv_ring));
+                          rds_ib_ring_empty(&ic->i_recv_ring) &&
+                          (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_tasklet);
 
                if (ic->i_send_hdrs)
                        ib_dma_free_coherent(dev,
@@ -755,6 +754,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);
 #endif
+       atomic_set(&ic->i_signaled_sends, 0);
 
        /*
         * rds_ib_conn_shutdown() waits for these to be emptied so they
index e86c0c85b82488c5a2919e03b0b85709b514751f..b152c018713b55e143c931ecdb8a669fdcb0e739 100644 (file)
@@ -219,6 +219,14 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
        }
 }
 
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+       if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+           waitqueue_active(&rds_ib_ring_empty_wait))
+               wake_up(&rds_ib_ring_empty_wait);
+       BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
 /*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
@@ -232,6 +240,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
        u32 completed;
        u32 oldest;
        u32 i = 0;
+       int nr_sig = 0;
 
        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                 (unsigned long long)wc->wr_id, wc->status, wc->byte_len,
@@ -255,6 +264,8 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
                struct rds_message *rm;
 
                send = &ic->i_sends[oldest];
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
 
                rm = rds_ib_send_unmap_op(ic, send, wc->status);
 
@@ -275,6 +286,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
        }
 
        rds_ib_ring_free(&ic->i_send_ring, completed);
+       rds_ib_sub_signaled(ic, nr_sig);
 
        clear_bit(RDS_LL_SEND_FULL, &conn->c_flags);
 
@@ -430,9 +442,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-                                             struct rds_ib_send_work *send,
-                                             bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+                                            struct rds_ib_send_work *send,
+                                            bool notify)
 {
        /*
         * We want to delay signaling completions just enough to get
@@ -442,7 +454,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
        if (ic->i_unsignaled_wrs-- == 0 || notify) {
                ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
                send->s_wr.send_flags |= IB_SEND_SIGNALED;
+               return 1;
        }
+       return 0;
 }
 
 /*
@@ -478,6 +492,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        int bytes_sent = 0;
        int ret;
        int flow_controlled = 0;
+       int nr_sig = 0;
 
        BUG_ON(off % RDS_FRAG_SIZE);
        BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -635,6 +650,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
                        send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
+
                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
                         &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
@@ -679,6 +697,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
        if (ic->i_flowctl && i < credit_alloc)
                rds_ib_send_add_credits(conn, credit_alloc - i);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        /* XXX need to worry about failed_wr and partial sends. */
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -689,6 +710,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                if (prev->s_op) {
                        ic->i_data_op = prev->s_op;
                        prev->s_op = NULL;
@@ -718,6 +740,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
        u32 pos;
        u32 work_alloc;
        int ret;
+       int nr_sig = 0;
 
        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
@@ -742,7 +765,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                send->s_wr.wr.atomic.compare_add = op->op_swap_add;
                send->s_wr.wr.atomic.swap = 0;
        }
-       rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+       nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
        send->s_wr.num_sge = 1;
        send->s_wr.next = NULL;
        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -768,6 +791,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
        rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
                 send->s_sge[0].addr, send->s_sge[0].length);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &send->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
        rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -777,6 +803,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
                printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %u.%u.%u.%u "
                       "returned %d\n", NIPQUAD(conn->c_faddr), ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }
 
@@ -807,6 +834,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
        int sent;
        int ret;
        int num_sge;
+       int nr_sig = 0;
 
        /* map the op the first time we see it */
        if (!op->op_mapped) {
@@ -849,7 +877,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                send->s_queued = jiffies;
                send->s_op = NULL;
 
-               rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+               nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -900,6 +928,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                work_alloc = i;
        }
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -909,6 +940,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
                printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }