EM(rxrpc_receive_incoming,              "INC") \
        EM(rxrpc_receive_queue,                 "QUE") \
        EM(rxrpc_receive_queue_last,            "QLS") \
-       E_(rxrpc_receive_rotate,                "ROT")
+       EM(rxrpc_receive_queue_oos,             "QUO") \
+       EM(rxrpc_receive_queue_oos_last,        "QOL") \
+       EM(rxrpc_receive_oos,                   "OOS") \
+       EM(rxrpc_receive_oos_last,              "OSL") \
+       EM(rxrpc_receive_rotate,                "ROT") \
+       E_(rxrpc_receive_rotate_last,           "RLS")
 
 #define rxrpc_recvmsg_traces \
        EM(rxrpc_recvmsg_cont,                  "CONT") \
                    __field(enum rxrpc_receive_trace,   why             )
                    __field(rxrpc_serial_t,             serial          )
                    __field(rxrpc_seq_t,                seq             )
-                   __field(rxrpc_seq_t,                hard_ack        )
-                   __field(rxrpc_seq_t,                top             )
+                   __field(u64,                        window          )
                             ),
 
            TP_fast_assign(
                    __entry->why = why;
                    __entry->serial = serial;
                    __entry->seq = seq;
-                   __entry->hard_ack = call->rx_hard_ack;
-                   __entry->top = call->rx_top;
+                   __entry->window = atomic64_read(&call->ackr_window);
                           ),
 
            TP_printk("c=%08x %s r=%08x q=%08x w=%08x-%08x",
                      __print_symbolic(__entry->why, rxrpc_receive_traces),
                      __entry->serial,
                      __entry->seq,
-                     __entry->hard_ack,
-                     __entry->top)
+                     lower_32_bits(__entry->window),
+                     upper_32_bits(__entry->window))
            );
 
 TRACE_EVENT(rxrpc_recvmsg,
                    __entry->call_serial = call->rx_serial;
                    __entry->conn_serial = call->conn->hi_serial;
                    __entry->tx_seq = call->tx_hard_ack;
-                   __entry->rx_seq = call->rx_hard_ack;
+                   __entry->rx_seq = call->rx_highest_seq;
                           ),
 
            TP_printk("c=%08x %08x:%08x r=%08x/%08x tx=%08x rx=%08x",
 
        u16             remain;
        u16             offset;         /* Offset of data */
        u16             len;            /* Length of data */
-       u8              rx_flags;       /* Received packet flags */
        u8              flags;
 #define RXRPC_RX_VERIFIED      0x01
 
        rxrpc_seq_t             tx_hard_ack;    /* Dead slot in buffer; the first transmitted but
                                                 * not hard-ACK'd packet follows this.
                                                 */
+
+       /* Transmitted data tracking. */
        rxrpc_seq_t             tx_top;         /* Highest Tx slot allocated. */
        u16                     tx_backoff;     /* Delay to insert due to Tx failure */
+       u8                      tx_winsize;     /* Maximum size of Tx window */
+
+       /* Received data tracking */
+       struct sk_buff_head     recvmsg_queue;  /* Queue of packets ready for recvmsg() */
+       struct sk_buff_head     rx_oos_queue;   /* Queue of out of sequence packets */
+
+       rxrpc_seq_t             rx_highest_seq; /* Higest sequence number received */
+       rxrpc_seq_t             rx_consumed;    /* Highest packet consumed */
+       rxrpc_serial_t          rx_serial;      /* Highest serial received for this call */
+       u8                      rx_winsize;     /* Size of Rx window */
 
        /* TCP-style slow-start congestion control [RFC5681].  Since the SMSS
         * is fixed, we keep these numbers in terms of segments (ie. DATA
        u8                      cong_cumul_acks; /* Cumulative ACK count */
        ktime_t                 cong_tstamp;    /* Last time cwnd was changed */
 
-       rxrpc_seq_t             rx_hard_ack;    /* Dead slot in buffer; the first received but not
-                                                * consumed packet follows this.
-                                                */
-       rxrpc_seq_t             rx_top;         /* Highest Rx slot allocated. */
-       rxrpc_seq_t             rx_expect_next; /* Expected next packet sequence number */
-       rxrpc_serial_t          rx_serial;      /* Highest serial received for this call */
-       u8                      rx_winsize;     /* Size of Rx window */
-       u8                      tx_winsize;     /* Maximum size of Tx window */
-
        spinlock_t              input_lock;     /* Lock for packet input to this call */
 
        /* Receive-phase ACK management (ACKs we send). */
        u8                      ackr_reason;    /* reason to ACK */
        rxrpc_serial_t          ackr_serial;    /* serial of packet being ACK'd */
-       rxrpc_seq_t             ackr_highest_seq; /* Higest sequence number received */
+       atomic64_t              ackr_window;    /* Base (in LSW) and top (in MSW) of SACK window */
        atomic_t                ackr_nr_unacked; /* Number of unacked packets */
        atomic_t                ackr_nr_consumed; /* Number of packets needing hard ACK */
+       struct {
+#define RXRPC_SACK_SIZE 256
+                /* SACK table for soft-acked packets */
+               u8              ackr_sack_table[RXRPC_SACK_SIZE];
+       } __aligned(8);
 
        /* RTT management */
        rxrpc_serial_t          rtt_serial[4];  /* Serial number of DATA or PING sent */
 
        INIT_LIST_HEAD(&call->accept_link);
        INIT_LIST_HEAD(&call->recvmsg_link);
        INIT_LIST_HEAD(&call->sock_link);
+       skb_queue_head_init(&call->recvmsg_queue);
+       skb_queue_head_init(&call->rx_oos_queue);
        init_waitqueue_head(&call->waitq);
        spin_lock_init(&call->lock);
        spin_lock_init(&call->notify_lock);
        call->tx_total_len = -1;
        call->next_rx_timo = 20 * HZ;
        call->next_req_timo = 1 * HZ;
+       atomic64_set(&call->ackr_window, 0x100000001ULL);
 
        memset(&call->sock_node, 0xed, sizeof(call->sock_node));
 
-       /* Leave space in the ring to handle a maxed-out jumbo packet */
        call->rx_winsize = rxrpc_rx_window_size;
        call->tx_winsize = 16;
-       call->rx_expect_next = 1;
 
        call->cong_cwnd = 2;
        call->cong_ssthresh = RXRPC_RXTX_BUFF_SIZE - 1;
                rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
                call->rxtx_buffer[i] = NULL;
        }
+       skb_queue_purge(&call->recvmsg_queue);
+       skb_queue_purge(&call->rx_oos_queue);
 }
 
 /*
 
                trace_rxrpc_disconnect_call(call);
                switch (call->completion) {
                case RXRPC_CALL_SUCCEEDED:
-                       chan->last_seq = call->rx_hard_ack;
+                       chan->last_seq = call->rx_highest_seq;
                        chan->last_type = RXRPC_PACKET_TYPE_ACK;
                        break;
                case RXRPC_CALL_LOCALLY_ABORTED:
 
        return rxrpc_end_tx_phase(call, true, "ETD");
 }
 
+static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
+                                         rxrpc_seq_t window, rxrpc_seq_t wtop)
+{
+       atomic64_set_release(&call->ackr_window, ((u64)wtop) << 32 | window);
+}
+
 /*
- * Process a DATA packet, adding the packet to the Rx ring.  The caller's
- * packet ref must be passed on or discarded.
+ * Push a DATA packet onto the Rx queue.
+ */
+static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
+                                  rxrpc_seq_t window, rxrpc_seq_t wtop,
+                                  enum rxrpc_receive_trace why)
+{
+       struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+       bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
+
+       __skb_queue_tail(&call->recvmsg_queue, skb);
+       rxrpc_input_update_ack_window(call, window, wtop);
+
+       trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
+}
+
+/*
+ * Process a DATA packet.
  */
 static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+       struct sk_buff *oos;
        rxrpc_serial_t serial = sp->hdr.serial;
-       rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
-       unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
+       u64 win = atomic64_read(&call->ackr_window);
+       rxrpc_seq_t window = lower_32_bits(win);
+       rxrpc_seq_t wtop = upper_32_bits(win);
+       rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
+       rxrpc_seq_t seq = sp->hdr.seq;
        bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
-       bool acked = false;
+       int ack_reason = -1;
 
        rxrpc_inc_stat(call->rxnet, stat_rx_data);
        if (sp->hdr.flags & RXRPC_REQUEST_ACK)
        if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
                rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
 
-       hard_ack = READ_ONCE(call->rx_hard_ack);
-
-       _proto("Rx DATA %%%u { #%x l=%u }", serial, seq, last);
-
        if (last) {
-               if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
-                   seq != call->rx_top) {
+               if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+                   seq + 1 != wtop) {
                        rxrpc_proto_abort("LSN", call, seq);
-                       goto out;
+                       goto err_free;
                }
        } else {
                if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
-                   after_eq(seq, call->rx_top)) {
+                   after_eq(seq, wtop)) {
+                       pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
+                               call->debug_id, seq, window, wtop, wlimit);
                        rxrpc_proto_abort("LSA", call, seq);
-                       goto out;
+                       goto err_free;
                }
        }
 
+       if (after(seq, call->rx_highest_seq))
+               call->rx_highest_seq = seq;
+
        trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);
 
-       if (before_eq(seq, hard_ack)) {
-               rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
-                              rxrpc_propose_ack_input_data);
-               goto out;
+       if (before(seq, window)) {
+               ack_reason = RXRPC_ACK_DUPLICATE;
+               goto send_ack;
        }
-
-       if (call->rxtx_buffer[ix]) {
-               rxrpc_send_ACK(call, RXRPC_ACK_DUPLICATE, serial,
-                              rxrpc_propose_ack_input_data);
-               goto out;
+       if (after(seq, wlimit)) {
+               ack_reason = RXRPC_ACK_EXCEEDS_WINDOW;
+               goto send_ack;
        }
 
-       if (after(seq, hard_ack + call->rx_winsize)) {
-               rxrpc_send_ACK(call, RXRPC_ACK_EXCEEDS_WINDOW, serial,
-                              rxrpc_propose_ack_input_data);
-               goto out;
-       }
+       /* Queue the packet. */
+       if (seq == window) {
+               rxrpc_seq_t reset_from;
+               bool reset_sack = false;
 
-       if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
-               rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, serial,
-                              rxrpc_propose_ack_input_data);
-               acked = true;
-       }
+               if (sp->hdr.flags & RXRPC_REQUEST_ACK)
+                       ack_reason = RXRPC_ACK_REQUESTED;
+               /* Send an immediate ACK if we fill in a hole */
+               else if (!skb_queue_empty(&call->rx_oos_queue))
+                       ack_reason = RXRPC_ACK_DELAY;
 
-       if (after(seq, call->ackr_highest_seq))
-               call->ackr_highest_seq = seq;
+               window++;
+               if (after(window, wtop))
+                       wtop = window;
 
-       /* Queue the packet.  We use a couple of memory barriers here as need
-        * to make sure that rx_top is perceived to be set after the buffer
-        * pointer and that the buffer pointer is set after the annotation and
-        * the skb data.
-        *
-        * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
-        * and also rxrpc_fill_out_ack().
-        */
-       call->rxtx_annotations[ix] = 1;
-       smp_wmb();
-       call->rxtx_buffer[ix] = skb;
-       if (after(seq, call->rx_top)) {
-               smp_store_release(&call->rx_top, seq);
-       } else if (before(seq, call->rx_top)) {
-               /* Send an immediate ACK if we fill in a hole */
-               if (!acked) {
-                       rxrpc_send_ACK(call, RXRPC_ACK_DELAY, serial,
-                                      rxrpc_propose_ack_input_data_hole);
-                       acked = true;
+               spin_lock(&call->recvmsg_queue.lock);
+               rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
+               skb = NULL;
+
+               while ((oos = skb_peek(&call->rx_oos_queue))) {
+                       struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
+
+                       if (after(osp->hdr.seq, window))
+                               break;
+
+                       __skb_unlink(oos, &call->rx_oos_queue);
+                       last = osp->hdr.flags & RXRPC_LAST_PACKET;
+                       seq = osp->hdr.seq;
+                       if (!reset_sack) {
+                               reset_from = seq;
+                               reset_sack = true;
+                       }
+
+                       window++;
+                       rxrpc_input_queue_data(call, oos, window, wtop,
+                                                rxrpc_receive_queue_oos);
                }
-       }
 
-       /* From this point on, we're not allowed to touch the packet any longer
-        * as its ref now belongs to the Rx ring.
-        */
-       skb = NULL;
-       sp = NULL;
+               spin_unlock(&call->recvmsg_queue.lock);
 
-       if (last) {
-               set_bit(RXRPC_CALL_RX_LAST, &call->flags);
-               trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
+               if (reset_sack) {
+                       do {
+                               call->ackr_sack_table[reset_from % RXRPC_SACK_SIZE] = 0;
+                       } while (reset_from++, before(reset_from, window));
+               }
        } else {
-               trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
-       }
+               bool keep = false;
+
+               ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
+
+               if (!call->ackr_sack_table[seq % RXRPC_SACK_SIZE]) {
+                       call->ackr_sack_table[seq % RXRPC_SACK_SIZE] = 1;
+                       keep = 1;
+               }
+
+               if (after(seq + 1, wtop)) {
+                       wtop = seq + 1;
+                       rxrpc_input_update_ack_window(call, window, wtop);
+               }
+
+               if (!keep) {
+                       ack_reason = RXRPC_ACK_DUPLICATE;
+                       goto send_ack;
+               }
+
+               skb_queue_walk(&call->rx_oos_queue, oos) {
+                       struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
 
-       if (after_eq(seq, call->rx_expect_next)) {
-               if (after(seq, call->rx_expect_next)) {
-                       _net("OOS %u > %u", seq, call->rx_expect_next);
-                       rxrpc_send_ACK(call, RXRPC_ACK_OUT_OF_SEQUENCE, serial,
-                                      rxrpc_propose_ack_input_data);
-                       acked = true;
+                       if (after(osp->hdr.seq, seq)) {
+                               __skb_queue_before(&call->rx_oos_queue, oos, skb);
+                               goto oos_queued;
+                       }
                }
-               call->rx_expect_next = seq + 1;
+
+               __skb_queue_tail(&call->rx_oos_queue, skb);
+       oos_queued:
+               trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
+                                   sp->hdr.serial, sp->hdr.seq);
+               skb = NULL;
        }
 
-out:
-       if (!acked &&
-           atomic_inc_return(&call->ackr_nr_unacked) > 2)
-               rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+send_ack:
+       if (ack_reason < 0 &&
+           atomic_inc_return(&call->ackr_nr_unacked) > 2 &&
+           test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
+               ack_reason = RXRPC_ACK_IDLE;
+       } else if (ack_reason >= 0) {
+               set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
+       }
+
+       if (ack_reason >= 0)
+               rxrpc_send_ACK(call, ack_reason, serial,
                               rxrpc_propose_ack_input_data);
        else
                rxrpc_propose_delay_ACK(call, serial,
                                        rxrpc_propose_ack_input_data);
 
-       trace_rxrpc_notify_socket(call->debug_id, serial);
-       rxrpc_notify_socket(call);
-
+err_free:
        rxrpc_free_skb(skb, rxrpc_skb_freed);
-       _leave(" [queued]");
 }
 
 /*
        rxrpc_serial_t serial = sp->hdr.serial;
        rxrpc_seq_t seq0 = sp->hdr.seq;
 
-       _enter("{%u,%u},{%u,%u}",
-              call->rx_hard_ack, call->rx_top, skb->len, seq0);
+       _enter("{%llx,%x},{%u,%x}",
+              atomic64_read(&call->ackr_window), call->rx_highest_seq,
+              skb->len, seq0);
 
        _proto("Rx DATA %%%u { #%u f=%02x }",
               sp->hdr.serial, seq0, sp->hdr.flags);
 
  * limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
  * packets.
  */
-unsigned int rxrpc_rx_window_size = RXRPC_INIT_RX_WINDOW_SIZE;
-#if (RXRPC_RXTX_BUFF_SIZE - 1) < RXRPC_INIT_RX_WINDOW_SIZE
-#error Need to reduce RXRPC_INIT_RX_WINDOW_SIZE
-#endif
+unsigned int rxrpc_rx_window_size = 255;
 
 /*
  * Maximum Rx MTU size.  This indicates to the sender the size of jumbo packet
 
  */
 static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
                                 struct rxrpc_call *call,
-                                struct rxrpc_txbuf *txb,
-                                rxrpc_seq_t *_hard_ack,
-                                rxrpc_seq_t *_top)
+                                struct rxrpc_txbuf *txb)
 {
        struct rxrpc_ackinfo ackinfo;
-       unsigned int tmp;
-       rxrpc_seq_t hard_ack, top, seq;
-       int ix;
+       unsigned int qsize;
+       rxrpc_seq_t window, wtop, wrap_point, ix, first;
+       int rsize;
+       u64 wtmp;
        u32 mtu, jmax;
        u8 *ackp = txb->acks;
+       u8 sack_buffer[sizeof(call->ackr_sack_table)] __aligned(8);
 
-       tmp = atomic_xchg(&call->ackr_nr_unacked, 0);
-       tmp |= atomic_xchg(&call->ackr_nr_consumed, 0);
-       if (!tmp && (txb->ack.reason == RXRPC_ACK_DELAY ||
-                    txb->ack.reason == RXRPC_ACK_IDLE)) {
-               rxrpc_inc_stat(call->rxnet, stat_tx_ack_skip);
-               return 0;
-       }
-
+       atomic_set(&call->ackr_nr_unacked, 0);
+       atomic_set(&call->ackr_nr_consumed, 0);
        rxrpc_inc_stat(call->rxnet, stat_tx_ack_fill);
 
        /* Barrier against rxrpc_input_data(). */
-       hard_ack = READ_ONCE(call->rx_hard_ack);
-       top = smp_load_acquire(&call->rx_top);
-       *_hard_ack = hard_ack;
-       *_top = top;
-
-       txb->ack.firstPacket    = htonl(hard_ack + 1);
-       txb->ack.previousPacket = htonl(call->ackr_highest_seq);
-       txb->ack.nAcks          = top - hard_ack;
-
-       if (txb->ack.nAcks) {
-               seq = hard_ack + 1;
-               do {
-                       ix = seq & RXRPC_RXTX_BUFF_MASK;
-                       if (call->rxtx_buffer[ix])
-                               *ackp++ = RXRPC_ACK_TYPE_ACK;
-                       else
-                               *ackp++ = RXRPC_ACK_TYPE_NACK;
-                       seq++;
-               } while (before_eq(seq, top));
+retry:
+       wtmp   = atomic64_read_acquire(&call->ackr_window);
+       window = lower_32_bits(wtmp);
+       wtop   = upper_32_bits(wtmp);
+       txb->ack.firstPacket = htonl(window);
+       txb->ack.nAcks = 0;
+
+       if (after(wtop, window)) {
+               /* Try to copy the SACK ring locklessly.  We can use the copy,
+                * only if the now-current top of the window didn't go past the
+                * previously read base - otherwise we can't know whether we
+                * have old data or new data.
+                */
+               memcpy(sack_buffer, call->ackr_sack_table, sizeof(sack_buffer));
+               wrap_point = window + RXRPC_SACK_SIZE - 1;
+               wtmp   = atomic64_read_acquire(&call->ackr_window);
+               window = lower_32_bits(wtmp);
+               wtop   = upper_32_bits(wtmp);
+               if (after(wtop, wrap_point)) {
+                       cond_resched();
+                       goto retry;
+               }
+
+               /* The buffer is maintained as a ring with an invariant mapping
+                * between bit position and sequence number, so we'll probably
+                * need to rotate it.
+                */
+               txb->ack.nAcks = wtop - window;
+               ix = window % RXRPC_SACK_SIZE;
+               first = sizeof(sack_buffer) - ix;
+
+               if (ix + txb->ack.nAcks <= RXRPC_SACK_SIZE) {
+                       memcpy(txb->acks, sack_buffer + ix, txb->ack.nAcks);
+               } else {
+                       memcpy(txb->acks, sack_buffer + ix, first);
+                       memcpy(txb->acks + first, sack_buffer,
+                              txb->ack.nAcks - first);
+               }
+
+               ackp += txb->ack.nAcks;
+       } else if (before(wtop, window)) {
+               pr_warn("ack window backward %x %x", window, wtop);
        } else if (txb->ack.reason == RXRPC_ACK_DELAY) {
                txb->ack.reason = RXRPC_ACK_IDLE;
        }
        mtu = conn->params.peer->if_mtu;
        mtu -= conn->params.peer->hdrsize;
        jmax = rxrpc_rx_jumbo_max;
+       qsize = (window - 1) - call->rx_consumed;
+       rsize = max_t(int, call->rx_winsize - qsize, 0);
        ackinfo.rxMTU           = htonl(rxrpc_rx_mtu);
        ackinfo.maxMTU          = htonl(mtu);
-       ackinfo.rwind           = htonl(call->rx_winsize);
+       ackinfo.rwind           = htonl(rsize);
        ackinfo.jumbo_max       = htonl(jmax);
 
        *ackp++ = 0;
        *ackp++ = 0;
        *ackp++ = 0;
        memcpy(ackp, &ackinfo, sizeof(ackinfo));
-       return top - hard_ack + 3 + sizeof(ackinfo);
+       return txb->ack.nAcks + 3 + sizeof(ackinfo);
 }
 
 /*
        struct msghdr msg;
        struct kvec iov[1];
        rxrpc_serial_t serial;
-       rxrpc_seq_t hard_ack, top;
        size_t len, n;
        int ret, rtt_slot = -1;
 
                clear_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags);
 
        spin_lock_bh(&call->lock);
-       n = rxrpc_fill_out_ack(conn, call, txb, &hard_ack, &top);
+       n = rxrpc_fill_out_ack(conn, call, txb);
        spin_unlock_bh(&call->lock);
        if (n == 0) {
                kfree(pkt);
 
        rxrpc_inc_stat(call->rxnet, stat_tx_ack_send);
 
+       /* Grab the highest received seq as late as possible */
+       txb->ack.previousPacket = htonl(call->rx_highest_seq);
+
        iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
        ret = do_udp_sendmsg(conn->params.local->socket, &msg, len);
        call->peer->last_tx_at = ktime_get_seconds();
 
        struct rxrpc_call *call;
        struct rxrpc_net *rxnet = rxrpc_net(seq_file_net(seq));
        unsigned long timeout = 0;
-       rxrpc_seq_t tx_hard_ack, rx_hard_ack;
+       rxrpc_seq_t tx_hard_ack;
        char lbuff[50], rbuff[50];
+       u64 wtmp;
 
        if (v == &rxnet->calls) {
                seq_puts(seq,
        }
 
        tx_hard_ack = READ_ONCE(call->tx_hard_ack);
-       rx_hard_ack = READ_ONCE(call->rx_hard_ack);
+       wtmp   = atomic64_read_acquire(&call->ackr_window);
        seq_printf(seq,
                   "UDP   %-47.47s %-47.47s %4x %08x %08x %s %3u"
                   " %-8.8s %08x %08x %08x %02x %08x %02x %08x %06lx\n",
                   call->abort_code,
                   call->debug_id,
                   tx_hard_ack, READ_ONCE(call->tx_top) - tx_hard_ack,
-                  rx_hard_ack, READ_ONCE(call->rx_top) - rx_hard_ack,
+                  lower_32_bits(wtmp), upper_32_bits(wtmp) - lower_32_bits(wtmp),
                   call->rx_serial,
                   timeout);
 
 
                break;
        }
 
-       trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
+       trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
+                            lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
                             call->rx_pkt_offset, call->rx_pkt_len, ret);
        return ret;
 }
  */
 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 {
+       rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
+
        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 
-       trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
-       ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
+       trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
 
        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
                rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
        struct rxrpc_skb_priv *sp;
        struct sk_buff *skb;
        rxrpc_serial_t serial;
-       rxrpc_seq_t hard_ack, top;
-       bool last = false;
-       int ix;
+       rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
+       bool last;
+       int acked;
 
        _enter("%d", call->debug_id);
 
-       hard_ack = call->rx_hard_ack;
-       top = smp_load_acquire(&call->rx_top);
-       ASSERT(before(hard_ack, top));
-
-       hard_ack++;
-       ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
-       skb = call->rxtx_buffer[ix];
+further_rotation:
+       skb = skb_dequeue(&call->recvmsg_queue);
        rxrpc_see_skb(skb, rxrpc_skb_rotated);
-       sp = rxrpc_skb(skb);
 
+       sp = rxrpc_skb(skb);
+       tseq   = sp->hdr.seq;
        serial = sp->hdr.serial;
+       last   = sp->hdr.flags & RXRPC_LAST_PACKET;
 
-       if (sp->hdr.flags & RXRPC_LAST_PACKET)
-               last = true;
-
-       call->rxtx_buffer[ix] = NULL;
-       call->rxtx_annotations[ix] = 0;
        /* Barrier against rxrpc_input_data(). */
-       smp_store_release(&call->rx_hard_ack, hard_ack);
+       if (after(tseq, call->rx_consumed))
+               smp_store_release(&call->rx_consumed, tseq);
 
        rxrpc_free_skb(skb, rxrpc_skb_freed);
 
-       trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
+       trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
+                           serial, call->rx_consumed);
        if (last) {
                rxrpc_end_rx_phase(call, serial);
-       } else {
-               /* Check to see if there's an ACK that needs sending. */
-               if (atomic_inc_return(&call->ackr_nr_consumed) > 2 &&
-                   !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
-                       rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
-                                      rxrpc_propose_ack_rotate_rx);
-                       rxrpc_transmit_ack_packets(call->peer->local);
-               }
+               return;
+       }
+
+       /* The next packet on the queue might entirely overlap with the one we
+        * just consumed; if so, rotate that away also.
+        */
+       skb = skb_peek(&call->recvmsg_queue);
+       if (skb) {
+               sp = rxrpc_skb(skb);
+               if (sp->hdr.seq != call->rx_consumed &&
+                   after_eq(call->rx_consumed, sp->hdr.seq))
+                       goto further_rotation;
+       }
+
+       /* Check to see if there's an ACK that needs sending. */
+       acked = atomic_add_return(call->rx_consumed - old_consumed,
+                                 &call->ackr_nr_consumed);
+       if (acked > 2 &&
+           !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
+               rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+                              rxrpc_propose_ack_rotate_rx);
+               rxrpc_transmit_ack_packets(call->peer->local);
        }
 }
 
 {
        struct rxrpc_skb_priv *sp;
        struct sk_buff *skb;
-       rxrpc_serial_t serial;
-       rxrpc_seq_t hard_ack, top, seq;
+       rxrpc_seq_t seq = 0;
        size_t remain;
        unsigned int rx_pkt_offset, rx_pkt_len;
-       int ix, copy, ret = -EAGAIN, ret2;
+       int copy, ret = -EAGAIN, ret2;
 
        rx_pkt_offset = call->rx_pkt_offset;
        rx_pkt_len = call->rx_pkt_len;
 
        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
-               seq = call->rx_hard_ack;
+               seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
                ret = 1;
                goto done;
        }
 
-       /* Barriers against rxrpc_input_data(). */
-       hard_ack = call->rx_hard_ack;
-       seq = hard_ack + 1;
-
-       while (top = smp_load_acquire(&call->rx_top),
-              before_eq(seq, top)
-              ) {
-               ix = seq & RXRPC_RXTX_BUFF_MASK;
-               skb = call->rxtx_buffer[ix];
-               if (!skb) {
-                       trace_rxrpc_recvdata(call, rxrpc_recvmsg_hole, seq,
-                                            rx_pkt_offset, rx_pkt_len, 0);
-                       rxrpc_transmit_ack_packets(call->peer->local);
-                       break;
-               }
-               smp_rmb();
+       /* No one else can be removing stuff from the queue, so we shouldn't
+        * need the Rx lock to walk it.
+        */
+       skb = skb_peek(&call->recvmsg_queue);
+       while (skb) {
                rxrpc_see_skb(skb, rxrpc_skb_seen);
                sp = rxrpc_skb(skb);
+               seq = sp->hdr.seq;
 
-               if (!(flags & MSG_PEEK)) {
-                       serial = sp->hdr.serial;
-                       trace_rxrpc_receive(call, rxrpc_receive_front,
-                                           serial, seq);
+               if (after_eq(call->rx_consumed, seq)) {
+                       kdebug("obsolete %x %x", call->rx_consumed, seq);
+                       goto skip_obsolete;
                }
 
+               if (!(flags & MSG_PEEK))
+                       trace_rxrpc_receive(call, rxrpc_receive_front,
+                                           sp->hdr.serial, seq);
+
                if (msg)
                        sock_recv_timestamp(msg, sock->sk, skb);
 
                                ret = ret2;
                                goto out;
                        }
+                       rxrpc_transmit_ack_packets(call->peer->local);
                } else {
                        trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
                                             rx_pkt_offset, rx_pkt_len, 0);
                        break;
                }
 
+       skip_obsolete:
                /* The whole packet has been transferred. */
                if (sp->hdr.flags & RXRPC_LAST_PACKET)
                        ret = 1;
                rx_pkt_offset = 0;
                rx_pkt_len = 0;
 
+               skb = skb_peek_next(skb, &call->recvmsg_queue);
+
                if (!(flags & MSG_PEEK))
                        rxrpc_rotate_rx_window(call);
-
-               seq++;
        }
 
 out:
                        ret = 0;
 
                rxrpc_transmit_ack_packets(call->peer->local);
-               if (after(call->rx_top, call->rx_hard_ack) &&
-                   call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
+               if (!skb_queue_empty(&call->recvmsg_queue))
                        rxrpc_notify_socket(call);
                break;
        default:
 
        abort_code = RXKADPACKETSHORT;
        if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header) + sizeof(*response),
                          ticket, ticket_len) < 0)
-               goto protocol_error_free;
 
        ret = rxkad_decrypt_ticket(conn, server_key, skb, ticket, ticket_len,
                                   &session_key, &expiry, _abort_code);
 
 static const unsigned int four = 4;
 static const unsigned int max_backlog = RXRPC_BACKLOG_MAX - 1;
 static const unsigned int n_65535 = 65535;
-static const unsigned int n_max_acks = RXRPC_RXTX_BUFF_SIZE - 1;
+static const unsigned int n_max_acks = 255;
 static const unsigned long one_jiffy = 1;
 static const unsigned long max_jiffies = MAX_JIFFY_OFFSET;