From 29ad8f00530affa693d538007c02198a1253e4ee Mon Sep 17 00:00:00 2001 From: Zach Brown Date: Tue, 13 Jul 2010 19:23:32 -0700 Subject: [PATCH] RDS/IB: track signaled sends RDS/IB: track signaled sends We're seeing bugs today where IB connection shutdown clears the send ring while the tasklet is processing completed sends. Implementation details cause this to dereference a null pointer. Shutdown needs to wait for send completion to stop before tearing down the connection. We can't simply wait for the ring to empty because it may contain unsignaled sends that will never be processed. This patch tracks the number of signaled sends that we've posted and waits for them to complete. It also makes sure that the tasklet has finished executing. Signed-off-by: Zach Brown Signed-off-by: Chris Mason Signed-off-by: Bang Nguyen --- net/rds/ib.h | 1 + net/rds/ib_cm.c | 10 +++++----- net/rds/ib_send.c | 42 +++++++++++++++++++++++++++++++++++++----- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/net/rds/ib.h b/net/rds/ib.h index 92119da9fcf6c..5cf8b6188e5ce 100644 --- a/net/rds/ib.h +++ b/net/rds/ib.h @@ -124,6 +124,7 @@ struct rds_ib_connection { struct rds_header *i_send_hdrs; u64 i_send_hdrs_dma; struct rds_ib_send_work *i_sends; + atomic_t i_signaled_sends; /* rx */ struct tasklet_struct i_recv_tasklet; diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c index 6325f124be5f6..4e30ee595f302 100644 --- a/net/rds/ib_cm.c +++ b/net/rds/ib_cm.c @@ -644,12 +644,11 @@ void rds_ib_conn_shutdown(struct rds_connection *conn) ic->i_cm_id, err); } - /* - * Don't wait for the send ring to be empty -- there may be completed - * non-signaled entries sitting on there. We unmap these below. - */ + /* quiesce tx and rx completion before tearing down */ wait_event(rds_ib_ring_empty_wait, - rds_ib_ring_empty(&ic->i_recv_ring)); + rds_ib_ring_empty(&ic->i_recv_ring) && + (atomic_read(&ic->i_signaled_sends) == 0)); + tasklet_kill(&ic->i_tasklet); if (ic->i_send_hdrs) ib_dma_free_coherent(dev, @@ -755,6 +754,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp) #ifndef KERNEL_HAS_ATOMIC64 spin_lock_init(&ic->i_ack_lock); #endif + atomic_set(&ic->i_signaled_sends, 0); /* * rds_ib_conn_shutdown() waits for these to be emptied so they diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c index e86c0c85b8248..b152c018713b5 100644 --- a/net/rds/ib_send.c +++ b/net/rds/ib_send.c @@ -219,6 +219,14 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic) } } +static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr) +{ + if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) && + waitqueue_active(&rds_ib_ring_empty_wait)) + wake_up(&rds_ib_ring_empty_wait); + BUG_ON(atomic_read(&ic->i_signaled_sends) < 0); +} + /* * The _oldest/_free ring operations here race cleanly with the alloc/unalloc * operations performed in the send path. As the sender allocs and potentially @@ -232,6 +240,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) u32 completed; u32 oldest; u32 i = 0; + int nr_sig = 0; rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n", (unsigned long long)wc->wr_id, wc->status, wc->byte_len, @@ -255,6 +264,8 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) struct rds_message *rm; send = &ic->i_sends[oldest]; + if (send->s_wr.send_flags & IB_SEND_SIGNALED) + nr_sig++; rm = rds_ib_send_unmap_op(ic, send, wc->status); @@ -275,6 +286,7 @@ void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc) } rds_ib_ring_free(&ic->i_send_ring, completed); + rds_ib_sub_signaled(ic, nr_sig); clear_bit(RDS_LL_SEND_FULL, &conn->c_flags); @@ -430,9 +442,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted) set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); } -static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, - struct rds_ib_send_work *send, - bool notify) +static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, + struct rds_ib_send_work *send, + bool notify) { /* * We want to delay signaling completions just enough to get @@ -442,7 +454,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic, if (ic->i_unsignaled_wrs-- == 0 || notify) { ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs; send->s_wr.send_flags |= IB_SEND_SIGNALED; + return 1; } + return 0; } /* @@ -478,6 +492,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, int bytes_sent = 0; int ret; int flow_controlled = 0; + int nr_sig = 0; BUG_ON(off % RDS_FRAG_SIZE); BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header)); @@ -635,6 +650,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (ic->i_flowctl && flow_controlled && i == (work_alloc-1)) send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; + if (send->s_wr.send_flags & IB_SEND_SIGNALED) + nr_sig++; + rdsdebug("send %p wr %p num_sge %u next %p\n", send, &send->s_wr, send->s_wr.num_sge, send->s_wr.next); @@ -679,6 +697,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, if (ic->i_flowctl && i < credit_alloc) rds_ib_send_add_credits(conn, credit_alloc - i); + if (nr_sig) + atomic_add(nr_sig, &ic->i_signaled_sends); + /* XXX need to worry about failed_wr and partial sends. */ failed_wr = &first->s_wr; ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); @@ -689,6 +710,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm, printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_sub_signaled(ic, nr_sig); if (prev->s_op) { ic->i_data_op = prev->s_op; prev->s_op = NULL; @@ -718,6 +740,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) u32 pos; u32 work_alloc; int ret; + int nr_sig = 0; rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); @@ -742,7 +765,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) send->s_wr.wr.atomic.compare_add = op->op_swap_add; send->s_wr.wr.atomic.swap = 0; } - rds_ib_set_wr_signal_state(ic, send, op->op_notify); + nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.num_sge = 1; send->s_wr.next = NULL; send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; @@ -768,6 +791,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr, send->s_sge[0].addr, send->s_sge[0].length); + if (nr_sig) + atomic_add(nr_sig, &ic->i_signaled_sends); + failed_wr = &send->s_wr; ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr); rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic, @@ -777,6 +803,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %u.%u.%u.%u " "returned %d\n", NIPQUAD(conn->c_faddr), ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_sub_signaled(ic, nr_sig); goto out; } @@ -807,6 +834,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) int sent; int ret; int num_sge; + int nr_sig = 0; /* map the op the first time we see it */ if (!op->op_mapped) { @@ -849,7 +877,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) send->s_queued = jiffies; send->s_op = NULL; - rds_ib_set_wr_signal_state(ic, send, op->op_notify); + nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify); send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; send->s_wr.wr.rdma.remote_addr = remote_addr; @@ -900,6 +928,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) work_alloc = i; } + if (nr_sig) + atomic_add(nr_sig, &ic->i_signaled_sends); + failed_wr = &first->s_wr; ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr); rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic, @@ -909,6 +940,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op) printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 " "returned %d\n", &conn->c_faddr, ret); rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); + rds_ib_sub_signaled(ic, nr_sig); goto out; } -- 2.50.1