}
 
                /*
-                * Don't wait for the send ring to be empty -- there may be completed
-                * non-signaled entries sitting on there. We unmap these below.
+                * We want to wait for tx and rx completion to finish
+                * before we tear down the connection, but we have to be
+                * careful not to get stuck waiting on a send ring that
+                * only has unsignaled sends in it.  We've shutdown new
+                * sends before getting here so by waiting for signaled
+                * sends to complete we're ensured that there will be no
+                * more tx processing.
                 */
                wait_event(rds_ib_ring_empty_wait,
-                       rds_ib_ring_empty(&ic->i_recv_ring));
+                          rds_ib_ring_empty(&ic->i_recv_ring) &&
+                          (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_recv_tasklet);
 
                if (ic->i_send_hdrs)
                        ib_dma_free_coherent(dev,
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);
 #endif
+       atomic_set(&ic->i_signaled_sends, 0);
 
        /*
         * rds_ib_conn_shutdown() waits for these to be emptied so they
 
        }
 }
 
+/*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+       if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+           waitqueue_active(&rds_ib_ring_empty_wait))
+               wake_up(&rds_ib_ring_empty_wait);
+       BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
 /*
  * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
  * operations performed in the send path.  As the sender allocs and potentially
        u32 oldest;
        u32 i = 0;
        int ret;
+       int nr_sig = 0;
 
        rdsdebug("cq %p conn %p\n", cq, conn);
        rds_ib_stats_inc(s_ib_tx_cq_call);
 
                for (i = 0; i < completed; i++) {
                        send = &ic->i_sends[oldest];
+                       if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                               nr_sig++;
 
                        rm = rds_ib_send_unmap_op(ic, send, wc.status);
 
                }
 
                rds_ib_ring_free(&ic->i_send_ring, completed);
+               rds_ib_sub_signaled(ic, nr_sig);
+               nr_sig = 0;
 
                if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
                    test_bit(0, &conn->c_map_queued))
                set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
-                                             struct rds_ib_send_work *send,
-                                             bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+                                            struct rds_ib_send_work *send,
+                                            bool notify)
 {
        /*
         * We want to delay signaling completions just enough to get
        if (ic->i_unsignaled_wrs-- == 0 || notify) {
                ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
                send->s_wr.send_flags |= IB_SEND_SIGNALED;
+               return 1;
        }
+       return 0;
 }
 
 /*
        int bytes_sent = 0;
        int ret;
        int flow_controlled = 0;
+       int nr_sig = 0;
 
        BUG_ON(off % RDS_FRAG_SIZE);
        BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
                if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
                        send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
 
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
+
                rdsdebug("send %p wr %p num_sge %u next %p\n", send,
                         &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
 
        if (ic->i_flowctl && i < credit_alloc)
                rds_ib_send_add_credits(conn, credit_alloc - i);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        /* XXX need to worry about failed_wr and partial sends. */
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
                printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                if (prev->s_op) {
                        ic->i_data_op = prev->s_op;
                        prev->s_op = NULL;
        u32 pos;
        u32 work_alloc;
        int ret;
+       int nr_sig = 0;
 
        rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
 
                send->s_wr.wr.atomic.compare_add = op->op_swap_add;
                send->s_wr.wr.atomic.swap = 0;
        }
-       rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+       nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
        send->s_wr.num_sge = 1;
        send->s_wr.next = NULL;
        send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
        rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
                 send->s_sge[0].addr, send->s_sge[0].length);
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &send->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
        rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
                printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }
 
        int sent;
        int ret;
        int num_sge;
+       int nr_sig = 0;
 
        /* map the op the first time we see it */
        if (!op->op_mapped) {
                send->s_queued = jiffies;
                send->s_op = NULL;
 
-               rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+               nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 
                send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
                send->s_wr.wr.rdma.remote_addr = remote_addr;
                work_alloc = i;
        }
 
+       if (nr_sig)
+               atomic_add(nr_sig, &ic->i_signaled_sends);
+
        failed_wr = &first->s_wr;
        ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
        rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
                printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
                       "returned %d\n", &conn->c_faddr, ret);
                rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_sub_signaled(ic, nr_sig);
                goto out;
        }