EM(rxrpc_call_queue_requeue,            "QUE requeue ") \
        EM(rxrpc_call_queue_resend,             "QUE resend  ") \
        EM(rxrpc_call_queue_timer,              "QUE timer   ") \
+       EM(rxrpc_call_queue_tx_data,            "QUE tx-data ") \
        EM(rxrpc_call_see_accept,               "SEE accept  ") \
        EM(rxrpc_call_see_activate_client,      "SEE act-clnt") \
        EM(rxrpc_call_see_connect_failed,       "SEE con-fail") \
                    __field(rxrpc_seq_t,                acks_hard_ack   )
                    __field(rxrpc_seq_t,                tx_bottom       )
                    __field(rxrpc_seq_t,                tx_top          )
+                   __field(rxrpc_seq_t,                tx_prepared     )
                    __field(int,                        tx_winsize      )
                             ),
 
                    __entry->acks_hard_ack = call->acks_hard_ack;
                    __entry->tx_bottom = call->tx_bottom;
                    __entry->tx_top = call->tx_top;
+                   __entry->tx_prepared = call->tx_prepared;
                    __entry->tx_winsize = call->tx_winsize;
                           ),
 
-           TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u",
+           TP_printk("c=%08x %s f=%08x h=%08x n=%u/%u/%u/%u",
                      __entry->call,
                      __print_symbolic(__entry->why, rxrpc_txqueue_traces),
                      __entry->tx_bottom,
                      __entry->acks_hard_ack,
                      __entry->tx_top - __entry->tx_bottom,
                      __entry->tx_top - __entry->acks_hard_ack,
+                     __entry->tx_prepared - __entry->tx_bottom,
                      __entry->tx_winsize)
            );
 
 
 
        /* Transmitted data tracking. */
        spinlock_t              tx_lock;        /* Transmit queue lock */
+       struct list_head        tx_sendmsg;     /* Sendmsg prepared packets */
        struct list_head        tx_buffer;      /* Buffer of transmissible packets */
        rxrpc_seq_t             tx_bottom;      /* First packet in buffer */
        rxrpc_seq_t             tx_transmitted; /* Highest packet transmitted */
+       rxrpc_seq_t             tx_prepared;    /* Highest Tx slot prepared. */
        rxrpc_seq_t             tx_top;         /* Highest Tx slot allocated. */
        u16                     tx_backoff;     /* Delay to insert due to Tx failure */
        u8                      tx_winsize;     /* Maximum size of Tx window */
  */
 struct rxrpc_txbuf {
        struct rcu_head         rcu;
-       struct list_head        call_link;      /* Link in call->tx_queue */
+       struct list_head        call_link;      /* Link in call->tx_sendmsg/tx_buffer */
        struct list_head        tx_link;        /* Link in live Enc queue or Tx queue */
        struct rxrpc_call       *call;          /* Call to which belongs */
        ktime_t                 last_sent;      /* Time at which last transmitted */
 int rxrpc_send_data_packet(struct rxrpc_call *, struct rxrpc_txbuf *);
 void rxrpc_reject_packets(struct rxrpc_local *);
 void rxrpc_send_keepalive(struct rxrpc_peer *);
+void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb);
 
 /*
  * peer_event.c
 
        _leave("");
 }
 
+static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
+{
+       unsigned int winsize = min_t(unsigned int, call->tx_winsize,
+                                    call->cong_cwnd + call->cong_extra);
+       rxrpc_seq_t window = call->acks_hard_ack, wtop = window + winsize;
+       rxrpc_seq_t tx_top = call->tx_top;
+       int space;
+
+       space = wtop - tx_top;
+       return space > 0;
+}
+
+/*
+ * Decant some if the sendmsg prepared queue into the transmission buffer.
+ */
+static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+{
+       struct rxrpc_txbuf *txb;
+
+       if (rxrpc_is_client_call(call) &&
+           !test_bit(RXRPC_CALL_EXPOSED, &call->flags))
+               rxrpc_expose_client_call(call);
+
+       while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
+                                              struct rxrpc_txbuf, call_link))) {
+               spin_lock(&call->tx_lock);
+               list_del(&txb->call_link);
+               spin_unlock(&call->tx_lock);
+
+               call->tx_top = txb->seq;
+               list_add_tail(&txb->call_link, &call->tx_buffer);
+
+               rxrpc_transmit_one(call, txb);
+
+               // TODO: Drain the transmission buffers.  Do this somewhere better
+               if (after(call->acks_hard_ack, call->tx_bottom + 16))
+                       rxrpc_shrink_call_tx_buffer(call);
+
+               if (!rxrpc_tx_window_has_space(call))
+                       break;
+       }
+}
+
+static void rxrpc_transmit_some_data(struct rxrpc_call *call)
+{
+       switch (call->state) {
+       case RXRPC_CALL_SERVER_ACK_REQUEST:
+               if (list_empty(&call->tx_sendmsg))
+                       return;
+               fallthrough;
+
+       case RXRPC_CALL_SERVER_SEND_REPLY:
+       case RXRPC_CALL_SERVER_AWAIT_ACK:
+       case RXRPC_CALL_CLIENT_SEND_REQUEST:
+       case RXRPC_CALL_CLIENT_AWAIT_REPLY:
+               if (!rxrpc_tx_window_has_space(call))
+                       return;
+               if (list_empty(&call->tx_sendmsg))
+                       return;
+               rxrpc_decant_prepared_tx(call);
+               break;
+       default:
+               return;
+       }
+}
+
 /*
  * Handle retransmission and deferred ACK/abort generation.
  */
               call->debug_id, rxrpc_call_states[call->state], call->events);
 
 recheck_state:
+       if (call->acks_hard_ack != call->tx_bottom)
+               rxrpc_shrink_call_tx_buffer(call);
+
        /* Limit the number of times we do this before returning to the manager */
-       iterations++;
-       if (iterations > 5)
-               goto requeue;
+       if (!rxrpc_tx_window_has_space(call) ||
+           list_empty(&call->tx_sendmsg)) {
+               iterations++;
+               if (iterations > 5)
+                       goto requeue;
+       }
 
        if (test_and_clear_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
                rxrpc_send_abort_packet(call);
                goto recheck_state;
        }
 
-       if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom)
-               rxrpc_shrink_call_tx_buffer(call);
-
        if (call->state == RXRPC_CALL_COMPLETE) {
                del_timer_sync(&call->timer);
                goto out;
                set_bit(RXRPC_CALL_EV_RESEND, &call->events);
        }
 
+       rxrpc_transmit_some_data(call);
+
        /* Process events */
        if (test_and_clear_bit(RXRPC_CALL_EV_EXPIRED, &call->events)) {
                if (test_bit(RXRPC_CALL_RX_HEARD, &call->flags) &&
 
        INIT_LIST_HEAD(&call->recvmsg_link);
        INIT_LIST_HEAD(&call->sock_link);
        INIT_LIST_HEAD(&call->attend_link);
+       INIT_LIST_HEAD(&call->tx_sendmsg);
        INIT_LIST_HEAD(&call->tx_buffer);
        skb_queue_head_init(&call->recvmsg_queue);
        skb_queue_head_init(&call->rx_oos_queue);
        del_timer_sync(&call->timer);
 
        rxrpc_cleanup_ring(call);
+       while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
+                                              struct rxrpc_txbuf, call_link))) {
+               list_del(&txb->call_link);
+               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
+       }
        while ((txb = list_first_entry_or_null(&call->tx_buffer,
                                               struct rxrpc_txbuf, call_link))) {
                list_del(&txb->call_link);
 
 
        trace_rxrpc_tx_data(call, txb->seq, serial, txb->wire.flags,
                            test_bit(RXRPC_TXBUF_RESENT, &txb->flags), false);
+
+       /* Track what we've attempted to transmit at least once so that the
+        * retransmission algorithm doesn't try to resend what we haven't sent
+        * yet.  However, this can race as we can receive an ACK before we get
+        * to this point.  But, OTOH, if we won't get an ACK mentioning this
+        * packet unless the far side received it (though it could have
+        * discarded it anyway and NAK'd it).
+        */
        cmpxchg(&call->tx_transmitted, txb->seq - 1, txb->seq);
 
        /* send the packet with the don't fragment bit set if we currently
        peer->last_tx_at = ktime_get_seconds();
        _leave("");
 }
+
+/*
+ * Schedule an instant Tx resend.
+ */
+static inline void rxrpc_instant_resend(struct rxrpc_call *call,
+                                       struct rxrpc_txbuf *txb)
+{
+       if (call->state < RXRPC_CALL_COMPLETE)
+               kdebug("resend");
+}
+
+/*
+ * Transmit one packet.
+ */
+void rxrpc_transmit_one(struct rxrpc_call *call, struct rxrpc_txbuf *txb)
+{
+       int ret;
+
+       ret = rxrpc_send_data_packet(call, txb);
+       if (ret < 0) {
+               switch (ret) {
+               case -ENETUNREACH:
+               case -EHOSTUNREACH:
+               case -ECONNREFUSED:
+                       rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
+                                                 0, ret);
+                       break;
+               default:
+                       _debug("need instant resend %d", ret);
+                       rxrpc_instant_resend(call, txb);
+               }
+       } else {
+               unsigned long now = jiffies;
+               unsigned long resend_at = now + call->peer->rto_j;
+
+               WRITE_ONCE(call->resend_at, resend_at);
+               rxrpc_reduce_call_timer(call, resend_at, now,
+                                       rxrpc_timer_set_for_send);
+       }
+}
 
  */
 static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
 {
-       unsigned int win_size;
-       rxrpc_seq_t tx_win = smp_load_acquire(&call->acks_hard_ack);
-
-       /* If we haven't transmitted anything for >1RTT, we should reset the
-        * congestion management state.
-        */
-       if (ktime_before(ktime_add_us(call->tx_last_sent,
-                                     call->peer->srtt_us >> 3),
-                        ktime_get_real())) {
-               if (RXRPC_TX_SMSS > 2190)
-                       win_size = 2;
-               else if (RXRPC_TX_SMSS > 1095)
-                       win_size = 3;
-               else
-                       win_size = 4;
-               win_size += call->cong_extra;
-       } else {
-               win_size = min_t(unsigned int, call->tx_winsize,
-                                call->cong_cwnd + call->cong_extra);
-       }
-
        if (_tx_win)
-               *_tx_win = tx_win;
-       return call->tx_top - tx_win < win_size;
+               *_tx_win = call->tx_bottom;
+       return call->tx_prepared - call->tx_bottom < 256;
 }
 
 /*
                if (signal_pending(current))
                        return sock_intr_errno(*timeo);
 
-               if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
-                       rxrpc_shrink_call_tx_buffer(call);
-                       continue;
-               }
-
                trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
                *timeo = schedule_timeout(*timeo);
        }
                    tx_win == tx_start && signal_pending(current))
                        return -EINTR;
 
-               if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
-                       rxrpc_shrink_call_tx_buffer(call);
-                       continue;
-               }
-
                if (tx_win != tx_start) {
                        timeout = rtt;
                        tx_start = tx_win;
                if (call->state >= RXRPC_CALL_COMPLETE)
                        return call->error;
 
-               if (READ_ONCE(call->acks_hard_ack) != call->tx_bottom) {
-                       rxrpc_shrink_call_tx_buffer(call);
-                       continue;
-               }
-
                trace_rxrpc_txqueue(call, rxrpc_txqueue_wait);
                *timeo = schedule_timeout(*timeo);
        }
        unsigned long now;
        rxrpc_seq_t seq = txb->seq;
        bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags);
-       int ret;
 
        rxrpc_inc_stat(call->rxnet, stat_tx_data);
 
-       ASSERTCMP(seq, ==, call->tx_top + 1);
+       ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
 
        /* We have to set the timestamp before queueing as the retransmit
         * algorithm can see the packet as soon as we queue it.
         */
        txb->last_sent = ktime_get_real();
 
-       /* Add the packet to the call's output buffer */
-       rxrpc_get_txbuf(txb, rxrpc_txbuf_get_buffer);
-       spin_lock(&call->tx_lock);
-       list_add_tail(&txb->call_link, &call->tx_buffer);
-       call->tx_top = seq;
-       spin_unlock(&call->tx_lock);
-
        if (last)
                trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
        else
                trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
 
+       /* Add the packet to the call's output buffer */
+       spin_lock(&call->tx_lock);
+       list_add_tail(&txb->call_link, &call->tx_sendmsg);
+       call->tx_prepared = seq;
+       spin_unlock(&call->tx_lock);
+
        if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
                _debug("________awaiting reply/ACK__________");
                write_lock_bh(&call->state_lock);
                write_unlock_bh(&call->state_lock);
        }
 
-       if (seq == 1 && rxrpc_is_client_call(call))
-               rxrpc_expose_client_call(call);
-
-       ret = rxrpc_send_data_packet(call, txb);
-       if (ret < 0) {
-               switch (ret) {
-               case -ENETUNREACH:
-               case -EHOSTUNREACH:
-               case -ECONNREFUSED:
-                       rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
-                                                 0, ret);
-                       goto out;
-               }
-       } else {
-               unsigned long now = jiffies;
-               unsigned long resend_at = now + call->peer->rto_j;
 
-               WRITE_ONCE(call->resend_at, resend_at);
-               rxrpc_reduce_call_timer(call, resend_at, now,
-                                       rxrpc_timer_set_for_send);
-       }
-
-out:
-       rxrpc_put_txbuf(txb, rxrpc_txbuf_put_trans);
+       /* Stick the packet on the crypto queue or the transmission queue as
+        * appropriate.
+        */
+       rxrpc_queue_call(call, rxrpc_call_queue_tx_data);
 }
 
 /*
 
                txb->offset             = 0;
                txb->flags              = 0;
                txb->ack_why            = 0;
-               txb->seq                = call->tx_top + 1;
+               txb->seq                = call->tx_prepared + 1;
                txb->wire.epoch         = htonl(call->conn->proto.epoch);
                txb->wire.cid           = htonl(call->cid);
                txb->wire.callNumber    = htonl(call->call_id);
 {
        struct rxrpc_txbuf *txb;
        rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
+       bool wake = false;
 
        _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
 
                if (txb->seq != call->tx_bottom + 1)
                        rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
                ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
-               call->tx_bottom++;
+               smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
                list_del_rcu(&txb->call_link);
 
                trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
                spin_unlock(&call->tx_lock);
 
                rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
+               if (after(call->acks_hard_ack, call->tx_bottom + 128))
+                       wake = true;
        }
 
        spin_unlock(&call->tx_lock);
+
+       if (wake)
+               wake_up(&call->waitq);
 }