spin_lock_init(&conn->c_lock);
        conn->c_next_tx_seq = 1;
 
-       spin_lock_init(&conn->c_send_lock);
-       atomic_set(&conn->c_send_generation, 1);
-       atomic_set(&conn->c_senders, 0);
+       init_waitqueue_head(&conn->c_waitq);
        INIT_LIST_HEAD(&conn->c_send_queue);
        INIT_LIST_HEAD(&conn->c_retrans);
 
                }
                mutex_unlock(&conn->c_cm_lock);
 
-               /* verify everybody's out of rds_send_xmit() */
-               spin_lock_irq(&conn->c_send_lock);
-               spin_unlock_irq(&conn->c_send_lock);
-
-               while(atomic_read(&conn->c_senders)) {
-                       schedule_timeout(1);
-                       spin_lock_irq(&conn->c_send_lock);
-                       spin_unlock_irq(&conn->c_send_lock);
-               }
+               wait_event(conn->c_waitq,
+                          !test_bit(RDS_IN_XMIT, &conn->c_flags));
 
                conn->c_trans->conn_shutdown(conn);
                rds_conn_reset(conn);
                sizeof(cinfo->transport));
        cinfo->flags = 0;
 
-       rds_conn_info_set(cinfo->flags,
-                         spin_is_locked(&conn->c_send_lock), SENDING);
+       rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags),
+                         SENDING);
        /* XXX Future: return the state rather than these funky bits */
        rds_conn_info_set(cinfo->flags,
                          atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
 
 MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
 
 /*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state.  Callers must ensure that this doesn't race with
+ * rds_send_xmit().
  */
 void rds_send_reset(struct rds_connection *conn)
 {
        struct rds_message *rm, *tmp;
        unsigned long flags;
 
-       spin_lock_irqsave(&conn->c_send_lock, flags);
        if (conn->c_xmit_rm) {
                rm = conn->c_xmit_rm;
                conn->c_xmit_rm = NULL;
                 * independently) but as the connection is down, there's
                 * no ongoing RDMA to/from that memory */
                rds_message_unmapped(rm);
-               spin_unlock_irqrestore(&conn->c_send_lock, flags);
-
                rds_message_put(rm);
-       } else {
-               spin_unlock_irqrestore(&conn->c_send_lock, flags);
        }
 
        conn->c_xmit_sg = 0;
        spin_unlock_irqrestore(&conn->c_lock, flags);
 }
 
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+       return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+       clear_bit(RDS_IN_XMIT, &conn->c_flags);
+       smp_mb__after_clear_bit();
+       /*
+        * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+        * hot path and finding waiters is very rare.  We don't want to walk
+        * the system-wide hashed waitqueue buckets in the fast path only to
+        * almost never find waiters.
+        */
+       if (waitqueue_active(&conn->c_waitq))
+               wake_up_all(&conn->c_waitq);
+}
+
 /*
  * We're making the concious trade-off here to only send one message
  * down the connection at a time.
        unsigned int tmp;
        struct scatterlist *sg;
        int ret = 0;
-       int gen = 0;
        LIST_HEAD(to_be_dropped);
 
 restart:
-       if (!rds_conn_up(conn))
-               goto out;
 
        /*
         * sendmsg calls here after having queued its message on the send
         * avoids blocking the caller and trading per-connection data between
         * caches per message.
         */
-       if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+       if (!acquire_in_xmit(conn)) {
                rds_stats_inc(s_send_lock_contention);
                ret = -ENOMEM;
                goto out;
        }
-       atomic_inc(&conn->c_senders);
+
+       /*
+        * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+        * we do the opposite to avoid races.
+        */
+       if (!rds_conn_up(conn)) {
+               release_in_xmit(conn);
+               ret = 0;
+               goto out;
+       }
 
        if (conn->c_trans->xmit_prepare)
                conn->c_trans->xmit_prepare(conn);
 
-       gen = atomic_inc_return(&conn->c_send_generation);
-
        /*
         * spin trying to push headers and data down the connection until
         * the connection doesn't make forward progress.
                if (!rm) {
                        unsigned int len;
 
-                       spin_lock(&conn->c_lock);
+                       spin_lock_irqsave(&conn->c_lock, flags);
 
                        if (!list_empty(&conn->c_send_queue)) {
                                rm = list_entry(conn->c_send_queue.next,
                                list_move_tail(&rm->m_conn_item, &conn->c_retrans);
                        }
 
-                       spin_unlock(&conn->c_lock);
+                       spin_unlock_irqrestore(&conn->c_lock, flags);
 
                        if (!rm)
                                break;
                         */
                        if (rm->rdma.op_active &&
                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
-                               spin_lock(&conn->c_lock);
+                               spin_lock_irqsave(&conn->c_lock, flags);
                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
                                        list_move(&rm->m_conn_item, &to_be_dropped);
-                               spin_unlock(&conn->c_lock);
+                               spin_unlock_irqrestore(&conn->c_lock, flags);
                                continue;
                        }
 
        if (conn->c_trans->xmit_complete)
                conn->c_trans->xmit_complete(conn);
 
-       /*
-        * We might be racing with another sender who queued a message but
-        * backed off on noticing that we held the c_send_lock.  If we check
-        * for queued messages after dropping the sem then either we'll
-        * see the queued message or the queuer will get the sem.  If we
-        * notice the queued message then we trigger an immediate retry.
-        *
-        * We need to be careful only to do this when we stopped processing
-        * the send queue because it was empty.  It's the only way we
-        * stop processing the loop when the transport hasn't taken
-        * responsibility for forward progress.
-        */
-       spin_unlock_irqrestore(&conn->c_send_lock, flags);
+       release_in_xmit(conn);
 
        /* Nuke any messages we decided not to retransmit. */
        if (!list_empty(&to_be_dropped)) {
                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
        }
 
-       atomic_dec(&conn->c_senders);
-
        /*
-        * Other senders will see we have c_send_lock and exit. We
-        * need to recheck the send queue and race again for c_send_lock
-        * to make sure messages don't just sit on the send queue, if
-        * somebody hasn't already beat us into the loop.
+        * Other senders can queue a message after we last test the send queue
+        * but before we clear RDS_IN_XMIT.  In that case they'd back off and
+        * not try and send their newly queued message.  We need to check the
+        * send queue after having cleared RDS_IN_XMIT so that their message
+        * doesn't get stuck on the send queue.
         *
         * If the transport cannot continue (i.e ret != 0), then it must
         * call us when more room is available, such as from the tx
                smp_mb();
                if (!list_empty(&conn->c_send_queue)) {
                        rds_stats_inc(s_send_lock_queue_raced);
-                       if (gen == atomic_read(&conn->c_send_generation)) {
-                               goto restart;
-                       }
+                       goto restart;
                }
        }
 out: