]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
rxrpc: Implement progressive transmission queue struct
authorDavid Howells <dhowells@redhat.com>
Wed, 4 Dec 2024 07:46:46 +0000 (07:46 +0000)
committerJakub Kicinski <kuba@kernel.org>
Mon, 9 Dec 2024 21:48:27 +0000 (13:48 -0800)
We need to scan the buffers in the transmission queue occasionally when
processing ACKs, but the transmission queue is currently a linked list of
transmission buffers which, when we eventually expand the Tx window to 8192
packets will be very slow to walk.

Instead, pull the fields we need to examine a lot (last sent time,
retransmitted flag) into a new struct rxrpc_txqueue and make each one hold
an array of 32 or 64 packets.

The transmission queue is then a list of these structs, each pointing to a
contiguous set of packets.  Scanning is then a lot faster as the flags and
timestamps are concentrated in the CPU dcache.

The transmission timestamps are stored as a number of microseconds from a
base ktime to reduce memory requirements.  This should be fine provided we
manage to transmit an entire buffer within an hour.

This will make implementing RACK-TLP [RFC8985] easier as it will be less
costly to scan the transmission buffers.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Link: https://patch.msgid.link/20241204074710.990092-19-dhowells@redhat.com
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_event.c
net/rxrpc/call_object.c
net/rxrpc/input.c
net/rxrpc/output.c
net/rxrpc/sendmsg.c
net/rxrpc/txbuf.c

index 28fa7be31ff839aebadaaa940abcc4ee573e999b..e6cf9ec940aac28eaff9723e91364daf8f4784af 100644 (file)
 
 #define rxrpc_txqueue_traces \
        EM(rxrpc_txqueue_await_reply,           "AWR") \
-       EM(rxrpc_txqueue_dequeue,               "DEQ") \
        EM(rxrpc_txqueue_end,                   "END") \
        EM(rxrpc_txqueue_queue,                 "QUE") \
        EM(rxrpc_txqueue_queue_last,            "QLS") \
        EM(rxrpc_txbuf_see_send_more,           "SEE SEND+  ")  \
        E_(rxrpc_txbuf_see_unacked,             "SEE UNACKED")
 
+#define rxrpc_tq_traces \
+       EM(rxrpc_tq_alloc,                      "ALLOC") \
+       EM(rxrpc_tq_cleaned,                    "CLEAN") \
+       EM(rxrpc_tq_decant,                     "DCNT ") \
+       EM(rxrpc_tq_decant_advance,             "DCNT>") \
+       EM(rxrpc_tq_queue,                      "QUEUE") \
+       EM(rxrpc_tq_queue_dup,                  "QUE!!") \
+       EM(rxrpc_tq_rotate,                     "ROT  ") \
+       EM(rxrpc_tq_rotate_and_free,            "ROT-F") \
+       EM(rxrpc_tq_rotate_and_keep,            "ROT-K") \
+       EM(rxrpc_tq_transmit,                   "XMIT ") \
+       E_(rxrpc_tq_transmit_advance,           "XMIT>")
+
 #define rxrpc_pmtud_reduce_traces \
        EM(rxrpc_pmtud_reduce_ack,              "Ack  ")        \
        EM(rxrpc_pmtud_reduce_icmp,             "Icmp ")        \
@@ -518,6 +530,7 @@ enum rxrpc_rtt_tx_trace             { rxrpc_rtt_tx_traces } __mode(byte);
 enum rxrpc_sack_trace          { rxrpc_sack_traces } __mode(byte);
 enum rxrpc_skb_trace           { rxrpc_skb_traces } __mode(byte);
 enum rxrpc_timer_trace         { rxrpc_timer_traces } __mode(byte);
+enum rxrpc_tq_trace            { rxrpc_tq_traces } __mode(byte);
 enum rxrpc_tx_point            { rxrpc_tx_points } __mode(byte);
 enum rxrpc_txbuf_trace         { rxrpc_txbuf_traces } __mode(byte);
 enum rxrpc_txqueue_trace       { rxrpc_txqueue_traces } __mode(byte);
@@ -554,6 +567,7 @@ rxrpc_rtt_tx_traces;
 rxrpc_sack_traces;
 rxrpc_skb_traces;
 rxrpc_timer_traces;
+rxrpc_tq_traces;
 rxrpc_tx_points;
 rxrpc_txbuf_traces;
 rxrpc_txqueue_traces;
@@ -881,7 +895,7 @@ TRACE_EVENT(rxrpc_txqueue,
                    __field(rxrpc_seq_t,                acks_hard_ack)
                    __field(rxrpc_seq_t,                tx_bottom)
                    __field(rxrpc_seq_t,                tx_top)
-                   __field(rxrpc_seq_t,                tx_prepared)
+                   __field(rxrpc_seq_t,                send_top)
                    __field(int,                        tx_winsize)
                             ),
 
@@ -891,7 +905,7 @@ TRACE_EVENT(rxrpc_txqueue,
                    __entry->acks_hard_ack = call->acks_hard_ack;
                    __entry->tx_bottom = call->tx_bottom;
                    __entry->tx_top = call->tx_top;
-                   __entry->tx_prepared = call->tx_prepared;
+                   __entry->send_top = call->send_top;
                    __entry->tx_winsize = call->tx_winsize;
                           ),
 
@@ -902,14 +916,14 @@ TRACE_EVENT(rxrpc_txqueue,
                      __entry->acks_hard_ack,
                      __entry->tx_top - __entry->tx_bottom,
                      __entry->tx_top - __entry->acks_hard_ack,
-                     __entry->tx_prepared - __entry->tx_bottom,
+                     __entry->send_top - __entry->tx_top,
                      __entry->tx_winsize)
            );
 
 TRACE_EVENT(rxrpc_transmit,
-           TP_PROTO(struct rxrpc_call *call, int space),
+           TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t send_top, int space),
 
-           TP_ARGS(call, space),
+           TP_ARGS(call, send_top, space),
 
            TP_STRUCT__entry(
                    __field(unsigned int,       call)
@@ -925,12 +939,12 @@ TRACE_EVENT(rxrpc_transmit,
 
            TP_fast_assign(
                    __entry->call       = call->debug_id;
-                   __entry->seq        = call->tx_bottom;
+                   __entry->seq        = call->tx_top + 1;
                    __entry->space      = space;
                    __entry->tx_winsize = call->tx_winsize;
                    __entry->cong_cwnd  = call->cong_cwnd;
                    __entry->cong_extra = call->cong_extra;
-                   __entry->prepared   = call->tx_prepared - call->tx_bottom;
+                   __entry->prepared   = send_top - call->tx_bottom;
                    __entry->in_flight  = call->tx_top - call->acks_hard_ack;
                    __entry->pmtud_jumbo = call->peer->pmtud_jumbo;
                           ),
@@ -947,6 +961,32 @@ TRACE_EVENT(rxrpc_transmit,
                      __entry->pmtud_jumbo)
            );
 
+TRACE_EVENT(rxrpc_tx_rotate,
+           TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, rxrpc_seq_t to),
+
+           TP_ARGS(call, seq, to),
+
+           TP_STRUCT__entry(
+                   __field(unsigned int,       call)
+                   __field(rxrpc_seq_t,        seq)
+                   __field(rxrpc_seq_t,        to)
+                   __field(rxrpc_seq_t,        top)
+                            ),
+
+           TP_fast_assign(
+                   __entry->call       = call->debug_id;
+                   __entry->seq        = seq;
+                   __entry->to         = to;
+                   __entry->top        = call->tx_top;
+                          ),
+
+           TP_printk("c=%08x q=%08x-%08x-%08x",
+                     __entry->call,
+                     __entry->seq,
+                     __entry->to,
+                     __entry->top)
+           );
+
 TRACE_EVENT(rxrpc_rx_data,
            TP_PROTO(unsigned int call, rxrpc_seq_t seq,
                     rxrpc_serial_t serial, u8 flags),
@@ -1621,10 +1661,11 @@ TRACE_EVENT(rxrpc_drop_ack,
            );
 
 TRACE_EVENT(rxrpc_retransmit,
-           TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq,
-                    rxrpc_serial_t serial, ktime_t expiry),
+           TP_PROTO(struct rxrpc_call *call,
+                    struct rxrpc_send_data_req *req,
+                    struct rxrpc_txbuf *txb, ktime_t expiry),
 
-           TP_ARGS(call, seq, serial, expiry),
+           TP_ARGS(call, req, txb, expiry),
 
            TP_STRUCT__entry(
                    __field(unsigned int,       call)
@@ -1635,8 +1676,8 @@ TRACE_EVENT(rxrpc_retransmit,
 
            TP_fast_assign(
                    __entry->call = call->debug_id;
-                   __entry->seq = seq;
-                   __entry->serial = serial;
+                   __entry->seq = req->seq;
+                   __entry->serial = txb->serial;
                    __entry->expiry = expiry;
                           ),
 
@@ -1714,9 +1755,9 @@ TRACE_EVENT(rxrpc_reset_cwnd,
                    __entry->cwnd       = call->cong_cwnd;
                    __entry->extra      = call->cong_extra;
                    __entry->hard_ack   = call->acks_hard_ack;
-                   __entry->prepared   = call->tx_prepared - call->tx_bottom;
+                   __entry->prepared   = call->send_top - call->tx_bottom;
                    __entry->since_last_tx = ktime_sub(now, call->tx_last_sent);
-                   __entry->has_data   = !list_empty(&call->tx_sendmsg);
+                   __entry->has_data   = call->tx_bottom != call->tx_top;
                           ),
 
            TP_printk("c=%08x q=%08x %s cw=%u+%u pr=%u tm=%llu d=%u",
@@ -2024,6 +2065,33 @@ TRACE_EVENT(rxrpc_txbuf,
                      __entry->ref)
            );
 
+TRACE_EVENT(rxrpc_tq,
+           TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq,
+                    rxrpc_seq_t seq, enum rxrpc_tq_trace trace),
+
+           TP_ARGS(call, tq, seq, trace),
+
+           TP_STRUCT__entry(
+                   __field(unsigned int,               call_debug_id)
+                   __field(rxrpc_seq_t,                qbase)
+                   __field(rxrpc_seq_t,                seq)
+                   __field(enum rxrpc_tq_trace,        trace)
+                            ),
+
+           TP_fast_assign(
+                   __entry->call_debug_id = call->debug_id;
+                   __entry->qbase = tq ? tq->qbase : call->tx_qbase;
+                   __entry->seq = seq;
+                   __entry->trace = trace;
+                          ),
+
+           TP_printk("c=%08x bq=%08x q=%08x %s",
+                     __entry->call_debug_id,
+                     __entry->qbase,
+                     __entry->seq,
+                     __print_symbolic(__entry->trace, rxrpc_tq_traces))
+           );
+
 TRACE_EVENT(rxrpc_poke_call,
            TP_PROTO(struct rxrpc_call *call, bool busy,
                     enum rxrpc_call_poke_trace what),
index 84efa21f176cc3f374a0f61e672bc24169f725f6..bcce4862b0b75f916bfb3719f71536022ee7320f 100644 (file)
@@ -30,6 +30,7 @@ struct rxrpc_crypt {
 struct key_preparsed_payload;
 struct rxrpc_connection;
 struct rxrpc_txbuf;
+struct rxrpc_txqueue;
 
 /*
  * Mark applied to socket buffers in skb->mark.  skb->priority is used
@@ -691,13 +692,17 @@ struct rxrpc_call {
        unsigned short          rx_pkt_offset;  /* Current recvmsg packet offset */
        unsigned short          rx_pkt_len;     /* Current recvmsg packet len */
 
+       /* Sendmsg data tracking. */
+       rxrpc_seq_t             send_top;       /* Highest Tx slot filled by sendmsg. */
+       struct rxrpc_txqueue    *send_queue;    /* Queue that sendmsg is writing into */
+
        /* Transmitted data tracking. */
        spinlock_t              tx_lock;        /* Transmit queue lock */
-       struct list_head        tx_sendmsg;     /* Sendmsg prepared packets */
-       struct list_head        tx_buffer;      /* Buffer of transmissible packets */
+       struct rxrpc_txqueue    *tx_queue;      /* Start of transmission buffers */
+       struct rxrpc_txqueue    *tx_qtail;      /* End of transmission buffers */
+       rxrpc_seq_t             tx_qbase;       /* First slot in tx_queue */
        rxrpc_seq_t             tx_bottom;      /* First packet in buffer */
        rxrpc_seq_t             tx_transmitted; /* Highest packet transmitted */
-       rxrpc_seq_t             tx_prepared;    /* Highest Tx slot prepared. */
        rxrpc_seq_t             tx_top;         /* Highest Tx slot allocated. */
        u16                     tx_backoff;     /* Delay to insert due to Tx failure (ms) */
        u8                      tx_winsize;     /* Maximum size of Tx window */
@@ -815,9 +820,6 @@ struct rxrpc_send_params {
  * Buffer of data to be output as a packet.
  */
 struct rxrpc_txbuf {
-       struct list_head        call_link;      /* Link in call->tx_sendmsg/tx_buffer */
-       struct list_head        tx_link;        /* Link in live Enc queue or Tx queue */
-       ktime_t                 last_sent;      /* Time at which last transmitted */
        refcount_t              ref;
        rxrpc_seq_t             seq;            /* Sequence number of this packet */
        rxrpc_serial_t          serial;         /* Last serial number transmitted with */
@@ -849,6 +851,36 @@ static inline bool rxrpc_sending_to_client(const struct rxrpc_txbuf *txb)
        return !rxrpc_sending_to_server(txb);
 }
 
+/*
+ * Transmit queue element, including RACK [RFC8985] per-segment metadata.  The
+ * transmission timestamp is in usec from the base.
+ */
+struct rxrpc_txqueue {
+       /* Start with the members we want to prefetch. */
+       struct rxrpc_txqueue    *next;
+       ktime_t                 xmit_ts_base;
+       rxrpc_seq_t             qbase;
+
+       /* The arrays we want to pack into as few cache lines as possible. */
+       struct {
+#define RXRPC_NR_TXQUEUE BITS_PER_LONG
+#define RXRPC_TXQ_MASK (RXRPC_NR_TXQUEUE - 1)
+               struct rxrpc_txbuf *bufs[RXRPC_NR_TXQUEUE];
+               unsigned int    segment_xmit_ts[RXRPC_NR_TXQUEUE];
+       } ____cacheline_aligned;
+};
+
+/*
+ * Data transmission request.
+ */
+struct rxrpc_send_data_req {
+       ktime_t                 now;            /* Current time */
+       struct rxrpc_txqueue    *tq;            /* Tx queue segment holding first DATA */
+       rxrpc_seq_t             seq;            /* Sequence of first data */
+       int                     n;              /* Number of DATA packets to glue into jumbo */
+       bool                    did_send;       /* T if did actually send */
+};
+
 #include <trace/events/rxrpc.h>
 
 /*
@@ -905,7 +937,6 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
                        enum rxrpc_propose_ack_trace why);
 void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
                             enum rxrpc_propose_ack_trace);
-void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);
 
 bool rxrpc_input_call_event(struct rxrpc_call *call);
@@ -1191,10 +1222,10 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
                    rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why);
 void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call);
 int rxrpc_send_abort_packet(struct rxrpc_call *);
+void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req);
 void rxrpc_send_conn_abort(struct rxrpc_connection *conn);
 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
 void rxrpc_send_keepalive(struct rxrpc_peer *);
-void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n);
 
 /*
  * peer_event.c
index ef47de3f41c6d8ce6bc6f2b8d2483ba6a4287b44..90e3d93956750e914825df44c52f7f33e777ded1 100644 (file)
@@ -62,57 +62,85 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
        set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags);
 }
 
+/*
+ * Retransmit one or more packets.
+ */
+static void rxrpc_retransmit_data(struct rxrpc_call *call,
+                                 struct rxrpc_send_data_req *req,
+                                 ktime_t rto)
+{
+       struct rxrpc_txqueue *tq = req->tq;
+       unsigned int ix = req->seq & RXRPC_TXQ_MASK;
+       struct rxrpc_txbuf *txb = tq->bufs[ix];
+       ktime_t xmit_ts, resend_at;
+
+       _enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id);
+
+       xmit_ts = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]);
+       resend_at = ktime_add(xmit_ts, rto);
+       trace_rxrpc_retransmit(call, req, txb,
+                              ktime_sub(resend_at, req->now));
+
+       txb->flags |= RXRPC_TXBUF_RESENT;
+       rxrpc_send_data_packet(call, req);
+       rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
+
+       req->tq         = NULL;
+       req->n          = 0;
+       req->did_send   = true;
+       req->now        = ktime_get_real();
+}
+
 /*
  * Perform retransmission of NAK'd and unack'd packets.
  */
 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 {
+       struct rxrpc_send_data_req req = {
+               .now    = ktime_get_real(),
+       };
        struct rxrpc_ackpacket *ack = NULL;
        struct rxrpc_skb_priv *sp;
+       struct rxrpc_txqueue *tq;
        struct rxrpc_txbuf *txb;
-       rxrpc_seq_t transmitted = call->tx_transmitted;
+       rxrpc_seq_t transmitted = call->tx_transmitted, seq;
        ktime_t next_resend = KTIME_MAX, rto = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
-       ktime_t resend_at = KTIME_MAX, now, delay;
+       ktime_t resend_at = KTIME_MAX, delay;
        bool unacked = false, did_send = false;
-       unsigned int i;
+       unsigned int qix;
 
        _enter("{%d,%d}", call->acks_hard_ack, call->tx_top);
 
-       now = ktime_get_real();
-
-       if (list_empty(&call->tx_buffer))
+       if (call->tx_bottom == call->tx_top)
                goto no_resend;
 
        trace_rxrpc_resend(call, ack_skb);
-       txb = list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link);
+       tq = call->tx_queue;
+       seq = call->tx_bottom;
 
-       /* Scan the soft ACK table without dropping the lock and resend any
-        * explicitly NAK'd packets.
-        */
+       /* Scan the soft ACK table and resend any explicitly NAK'd packets. */
        if (ack_skb) {
                sp = rxrpc_skb(ack_skb);
                ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header);
 
-               for (i = 0; i < sp->ack.nr_acks; i++) {
-                       rxrpc_seq_t seq;
+               for (int i = 0; i < sp->ack.nr_acks; i++) {
+                       rxrpc_seq_t aseq;
 
                        if (ack->acks[i] & 1)
                                continue;
-                       seq = sp->ack.first_ack + i;
-                       if (after(txb->seq, transmitted))
-                               break;
-                       if (after(txb->seq, seq))
-                               continue; /* A new hard ACK probably came in */
-                       list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
-                               if (txb->seq == seq)
-                                       goto found_txb;
-                       }
-                       goto no_further_resend;
+                       aseq = sp->ack.first_ack + i;
+                       while (after_eq(aseq, tq->qbase + RXRPC_NR_TXQUEUE))
+                               tq = tq->next;
+                       seq = aseq;
+                       qix = seq - tq->qbase;
+                       txb = tq->bufs[qix];
+                       if (after(seq, transmitted))
+                               goto no_further_resend;
 
-               found_txb:
-                       resend_at = ktime_add(txb->last_sent, rto);
+                       resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
+                       resend_at = ktime_add(resend_at, rto);
                        if (after(txb->serial, call->acks_highest_serial)) {
-                               if (ktime_after(resend_at, now) &&
+                               if (ktime_after(resend_at, req.now) &&
                                    ktime_before(resend_at, next_resend))
                                        next_resend = resend_at;
                                continue; /* Ack point not yet reached */
@@ -120,17 +148,13 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 
                        rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked);
 
-                       trace_rxrpc_retransmit(call, txb->seq, txb->serial,
-                                              ktime_sub(resend_at, now));
-
-                       txb->flags |= RXRPC_TXBUF_RESENT;
-                       rxrpc_transmit_data(call, txb, 1);
-                       did_send = true;
-                       now = ktime_get_real();
+                       req.tq  = tq;
+                       req.seq = seq;
+                       req.n   = 1;
+                       rxrpc_retransmit_data(call, &req, rto);
 
-                       if (list_is_last(&txb->call_link, &call->tx_buffer))
+                       if (after_eq(seq, call->tx_top))
                                goto no_further_resend;
-                       txb = list_next_entry(txb, call_link);
                }
        }
 
@@ -139,35 +163,43 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
         * ACK'd or NACK'd in due course, so don't worry about it here; here we
         * need to consider retransmitting anything beyond that point.
         */
-       if (after_eq(call->acks_prev_seq, call->tx_transmitted))
+       seq = call->acks_prev_seq;
+       if (after_eq(seq, call->tx_transmitted))
                goto no_further_resend;
+       seq++;
 
-       list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
-               resend_at = ktime_add(txb->last_sent, rto);
+       while (after_eq(seq, tq->qbase + RXRPC_NR_TXQUEUE))
+               tq = tq->next;
 
-               if (before_eq(txb->seq, call->acks_prev_seq))
+       while (before_eq(seq, call->tx_transmitted)) {
+               qix = seq - tq->qbase;
+               if (qix >= RXRPC_NR_TXQUEUE) {
+                       tq = tq->next;
                        continue;
-               if (after(txb->seq, call->tx_transmitted))
-                       break; /* Not transmitted yet */
+               }
+               txb = tq->bufs[qix];
+               resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
+               resend_at = ktime_add(resend_at, rto);
 
                if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE &&
                    before(txb->serial, ntohl(ack->serial)))
                        goto do_resend; /* Wasn't accounted for by a more recent ping. */
 
-               if (ktime_after(resend_at, now)) {
+               if (ktime_after(resend_at, req.now)) {
                        if (ktime_before(resend_at, next_resend))
                                next_resend = resend_at;
+                       seq++;
                        continue;
                }
 
        do_resend:
                unacked = true;
 
-               txb->flags |= RXRPC_TXBUF_RESENT;
-               rxrpc_transmit_data(call, txb, 1);
-               did_send = true;
-               rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
-               now = ktime_get_real();
+               req.tq  = tq;
+               req.seq = seq;
+               req.n   = 1;
+               rxrpc_retransmit_data(call, &req, rto);
+               seq++;
        }
 
 no_further_resend:
@@ -175,7 +207,8 @@ no_resend:
        if (resend_at < KTIME_MAX) {
                delay = rxrpc_get_rto_backoff(call->peer, did_send);
                resend_at = ktime_add(resend_at, delay);
-               trace_rxrpc_timer_set(call, resend_at - now, rxrpc_timer_trace_resend_reset);
+               trace_rxrpc_timer_set(call, resend_at - req.now,
+                                     rxrpc_timer_trace_resend_reset);
        }
        call->resend_at = resend_at;
 
@@ -186,11 +219,11 @@ no_resend:
         * that an ACK got lost somewhere.  Send a ping to find out instead of
         * retransmitting data.
         */
-       if (!did_send) {
+       if (!req.did_send) {
                ktime_t next_ping = ktime_add_us(call->acks_latest_ts,
                                                 call->peer->srtt_us >> 3);
 
-               if (ktime_sub(next_ping, now) <= 0)
+               if (ktime_sub(next_ping, req.now) <= 0)
                        rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
                                       rxrpc_propose_ack_ping_for_0_retrans);
        }
@@ -240,47 +273,68 @@ static unsigned int rxrpc_tx_window_space(struct rxrpc_call *call)
 }
 
 /*
- * Decant some if the sendmsg prepared queue into the transmission buffer.
+ * Transmit some as-yet untransmitted data.
  */
-static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+static void rxrpc_transmit_fresh_data(struct rxrpc_call *call)
 {
        int space = rxrpc_tx_window_space(call);
 
        if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
-               if (list_empty(&call->tx_sendmsg))
+               if (call->send_top == call->tx_top)
                        return;
                rxrpc_expose_client_call(call);
        }
 
        while (space > 0) {
-               struct rxrpc_txbuf *head = NULL, *txb;
-               int count = 0, limit = min(space, 1);
-
-               if (list_empty(&call->tx_sendmsg))
+               struct rxrpc_send_data_req req = {
+                       .now    = ktime_get_real(),
+                       .seq    = call->tx_transmitted + 1,
+                       .n      = 0,
+               };
+               struct rxrpc_txqueue *tq;
+               struct rxrpc_txbuf *txb;
+               rxrpc_seq_t send_top, seq;
+               int limit = min(space, 1);
+
+               /* Order send_top before the contents of the new txbufs and
+                * txqueue pointers
+                */
+               send_top = smp_load_acquire(&call->send_top);
+               if (call->tx_top == send_top)
                        break;
 
-               trace_rxrpc_transmit(call, space);
+               trace_rxrpc_transmit(call, send_top, space);
+
+               tq = call->tx_qtail;
+               seq = call->tx_top;
+               trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);
 
-               spin_lock(&call->tx_lock);
                do {
-                       txb = list_first_entry(&call->tx_sendmsg,
-                                              struct rxrpc_txbuf, call_link);
-                       if (!head)
-                               head = txb;
-                       list_move_tail(&txb->call_link, &call->tx_buffer);
-                       count++;
+                       int ix;
+
+                       seq++;
+                       ix = seq & RXRPC_TXQ_MASK;
+                       if (!ix) {
+                               tq = tq->next;
+                               trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
+                       }
+                       if (!req.tq)
+                               req.tq = tq;
+                       txb = tq->bufs[ix];
+                       req.n++;
                        if (!txb->jumboable)
                                break;
-               } while (count < limit && !list_empty(&call->tx_sendmsg));
+               } while (req.n < limit && before(seq, send_top));
 
-               spin_unlock(&call->tx_lock);
-
-               call->tx_top = txb->seq;
-               if (txb->flags & RXRPC_LAST_PACKET)
+               if (txb->flags & RXRPC_LAST_PACKET) {
                        rxrpc_close_tx_phase(call);
+                       tq = NULL;
+               }
+               call->tx_qtail = tq;
+               call->tx_top = seq;
 
-               space -= count;
-               rxrpc_transmit_data(call, head, count);
+               space -= req.n;
+               rxrpc_send_data_packet(call, &req);
        }
 }
 
@@ -288,7 +342,7 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
 {
        switch (__rxrpc_call_state(call)) {
        case RXRPC_CALL_SERVER_ACK_REQUEST:
-               if (list_empty(&call->tx_sendmsg))
+               if (call->tx_bottom == READ_ONCE(call->send_top))
                        return;
                rxrpc_begin_service_reply(call);
                fallthrough;
@@ -297,11 +351,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
        case RXRPC_CALL_CLIENT_SEND_REQUEST:
                if (!rxrpc_tx_window_space(call))
                        return;
-               if (list_empty(&call->tx_sendmsg)) {
+               if (call->tx_bottom == READ_ONCE(call->send_top)) {
                        rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
                        return;
                }
-               rxrpc_decant_prepared_tx(call);
+               rxrpc_transmit_fresh_data(call);
                break;
        default:
                return;
@@ -503,8 +557,6 @@ out:
                    call->peer->pmtud_pending)
                        rxrpc_send_probe_for_pmtud(call);
        }
-       if (call->acks_hard_ack != call->tx_bottom)
-               rxrpc_shrink_call_tx_buffer(call);
        _leave("");
        return true;
 
index c026f16f891ebd63d215a01590f2ef1a6b82d208..a9682b31a4f9f34b4f98d0a167014f148d83740a 100644 (file)
@@ -146,8 +146,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        INIT_LIST_HEAD(&call->recvmsg_link);
        INIT_LIST_HEAD(&call->sock_link);
        INIT_LIST_HEAD(&call->attend_link);
-       INIT_LIST_HEAD(&call->tx_sendmsg);
-       INIT_LIST_HEAD(&call->tx_buffer);
        skb_queue_head_init(&call->rx_queue);
        skb_queue_head_init(&call->recvmsg_queue);
        skb_queue_head_init(&call->rx_oos_queue);
@@ -532,9 +530,26 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 }
 
 /*
- * Clean up the Rx skb ring.
+ * Clean up the transmission buffers.
  */
-static void rxrpc_cleanup_ring(struct rxrpc_call *call)
+static void rxrpc_cleanup_tx_buffers(struct rxrpc_call *call)
+{
+       struct rxrpc_txqueue *tq, *next;
+
+       for (tq = call->tx_queue; tq; tq = next) {
+               next = tq->next;
+               for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+                       if (tq->bufs[i])
+                               rxrpc_put_txbuf(tq->bufs[i], rxrpc_txbuf_put_cleaned);
+               trace_rxrpc_tq(call, tq, 0, rxrpc_tq_cleaned);
+               kfree(tq);
+       }
+}
+
+/*
+ * Clean up the receive buffers.
+ */
+static void rxrpc_cleanup_rx_buffers(struct rxrpc_call *call)
 {
        rxrpc_purge_queue(&call->recvmsg_queue);
        rxrpc_purge_queue(&call->rx_queue);
@@ -673,23 +688,12 @@ static void rxrpc_rcu_free_call(struct rcu_head *rcu)
 static void rxrpc_destroy_call(struct work_struct *work)
 {
        struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
-       struct rxrpc_txbuf *txb;
 
        del_timer_sync(&call->timer);
 
        rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
-       rxrpc_cleanup_ring(call);
-       while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
-                                              struct rxrpc_txbuf, call_link))) {
-               list_del(&txb->call_link);
-               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
-       }
-       while ((txb = list_first_entry_or_null(&call->tx_buffer,
-                                              struct rxrpc_txbuf, call_link))) {
-               list_del(&txb->call_link);
-               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
-       }
-
+       rxrpc_cleanup_tx_buffers(call);
+       rxrpc_cleanup_rx_buffers(call);
        rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
        rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
        rxrpc_deactivate_bundle(call->bundle);
index 96fe005c5e819600c222f4af2e654d317c47c424..cfdd23042d4c72adb6fcc55649bc371551eecc4e 100644 (file)
@@ -214,24 +214,71 @@ void rxrpc_congestion_degrade(struct rxrpc_call *call)
 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
                                   struct rxrpc_ack_summary *summary)
 {
-       struct rxrpc_txbuf *txb;
+       struct rxrpc_txqueue *tq = call->tx_queue;
+       rxrpc_seq_t seq = call->tx_bottom + 1;
        bool rot_last = false;
 
-       list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
-               if (before_eq(txb->seq, call->acks_hard_ack))
-                       continue;
-               if (txb->flags & RXRPC_LAST_PACKET) {
+       _enter("%x,%x,%x", call->tx_bottom, call->acks_hard_ack, to);
+
+       trace_rxrpc_tx_rotate(call, seq, to);
+       trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate);
+
+       /* We may have a left over fully-consumed buffer at the front that we
+        * couldn't drop before (rotate_and_keep below).
+        */
+       if (seq == call->tx_qbase + RXRPC_NR_TXQUEUE) {
+               call->tx_qbase += RXRPC_NR_TXQUEUE;
+               call->tx_queue = tq->next;
+               trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+               kfree(tq);
+               tq = call->tx_queue;
+       }
+
+       do {
+               unsigned int ix = seq - call->tx_qbase;
+
+               _debug("tq=%x seq=%x i=%d f=%x", tq->qbase, seq, ix, tq->bufs[ix]->flags);
+               if (tq->bufs[ix]->flags & RXRPC_LAST_PACKET) {
                        set_bit(RXRPC_CALL_TX_LAST, &call->flags);
                        rot_last = true;
                }
-               if (txb->seq == to)
-                       break;
-       }
+               rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated);
+               tq->bufs[ix] = NULL;
+
+               WRITE_ONCE(call->tx_bottom, seq);
+               WRITE_ONCE(call->acks_hard_ack, seq);
+               trace_rxrpc_txqueue(call, (rot_last ?
+                                          rxrpc_txqueue_rotate_last :
+                                          rxrpc_txqueue_rotate));
 
-       if (rot_last)
+               seq++;
+               if (!(seq & RXRPC_TXQ_MASK)) {
+                       prefetch(tq->next);
+                       if (tq != call->tx_qtail) {
+                               call->tx_qbase += RXRPC_NR_TXQUEUE;
+                               call->tx_queue = tq->next;
+                               trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+                               kfree(tq);
+                               tq = call->tx_queue;
+                       } else {
+                               trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_keep);
+                               tq = NULL;
+                               break;
+                       }
+               }
+
+       } while (before_eq(seq, to));
+
+       if (rot_last) {
                set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags);
+               if (tq) {
+                       trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+                       kfree(tq);
+                       call->tx_queue = NULL;
+               }
+       }
 
-       _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
+       _debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
 
        if (call->acks_lowest_nak == call->acks_hard_ack) {
                call->acks_lowest_nak = to;
@@ -240,11 +287,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
                call->acks_lowest_nak = to;
        }
 
-       smp_store_release(&call->acks_hard_ack, to);
-
-       trace_rxrpc_txqueue(call, (rot_last ?
-                                  rxrpc_txqueue_rotate_last :
-                                  rxrpc_txqueue_rotate));
        wake_up(&call->waitq);
        return rot_last;
 }
index 400c3389d4925bbb08a2c7a643d8de74024b2857..c2044d5932374e046a168381070662e760852d12 100644 (file)
@@ -375,10 +375,10 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
 /*
  * Prepare a (sub)packet for transmission.
  */
-static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc_txbuf *txb,
-                                          rxrpc_serial_t serial,
-                                          int subpkt, int nr_subpkts,
-                                          ktime_t now)
+static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
+                                          struct rxrpc_send_data_req *req,
+                                          struct rxrpc_txbuf *txb,
+                                          rxrpc_serial_t serial, int subpkt)
 {
        struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
        struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo);
@@ -386,7 +386,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
        struct rxrpc_connection *conn = call->conn;
        struct kvec *kv = &call->local->kvec[subpkt];
        size_t len = txb->pkt_len;
-       bool last, more;
+       bool last;
        u8 flags;
 
        _enter("%x,%zd", txb->seq, len);
@@ -401,14 +401,11 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
        flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;
        last = txb->flags & RXRPC_LAST_PACKET;
 
-       if (subpkt < nr_subpkts - 1) {
+       if (subpkt < req->n - 1) {
                len = RXRPC_JUMBO_DATALEN;
                goto dont_set_request_ack;
        }
 
-       more = (!list_is_last(&txb->call_link, &call->tx_buffer) ||
-               !list_empty(&call->tx_sendmsg));
-
        /* If our RTT cache needs working on, request an ACK.  Also request
         * ACKs if a DATA packet appears to have been lost.
         *
@@ -430,7 +427,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
                why = rxrpc_reqack_more_rtt;
        else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), ktime_get_real()))
                why = rxrpc_reqack_old_rtt;
-       else if (!last && !more)
+       else if (!last && !after(READ_ONCE(call->send_top), txb->seq))
                why = rxrpc_reqack_app_stall;
        else
                goto dont_set_request_ack;
@@ -439,13 +436,13 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
        trace_rxrpc_req_ack(call->debug_id, txb->seq, why);
        if (why != rxrpc_reqack_no_srv_last) {
                flags |= RXRPC_REQUEST_ACK;
-               rxrpc_begin_rtt_probe(call, serial, now, rxrpc_rtt_tx_data);
-               call->peer->rtt_last_req = now;
+               rxrpc_begin_rtt_probe(call, serial, req->now, rxrpc_rtt_tx_data);
+               call->peer->rtt_last_req = req->now;
        }
 dont_set_request_ack:
 
        /* The jumbo header overlays the wire header in the txbuf. */
-       if (subpkt < nr_subpkts - 1)
+       if (subpkt < req->n - 1)
                flags |= RXRPC_JUMBO_PACKET;
        else
                flags &= ~RXRPC_JUMBO_PACKET;
@@ -469,62 +466,100 @@ dont_set_request_ack:
        return len;
 }
 
+/*
+ * Prepare a transmission queue object for initial transmission.  Returns the
+ * number of microseconds since the transmission queue base timestamp.
+ */
+static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
+                                         struct rxrpc_send_data_req *req)
+{
+       if (!tq)
+               return 0;
+       if (tq->xmit_ts_base == KTIME_MIN) {
+               tq->xmit_ts_base = req->now;
+               return 0;
+       }
+       return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base));
+}
+
 /*
  * Prepare a (jumbo) packet for transmission.
  */
-static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *head, int n)
+static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
 {
-       struct rxrpc_txbuf *txb = head;
+       struct rxrpc_txqueue *tq = req->tq;
        rxrpc_serial_t serial;
-       ktime_t now = ktime_get_real();
+       unsigned int xmit_ts;
+       rxrpc_seq_t seq = req->seq;
        size_t len = 0;
 
+       trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit);
+
        /* Each transmission of a Tx packet needs a new serial number */
-       serial = rxrpc_get_next_serials(call->conn, n);
+       serial = rxrpc_get_next_serials(call->conn, req->n);
 
-       for (int i = 0; i < n; i++) {
-               txb->last_sent = now;
-               len += rxrpc_prepare_data_subpacket(call, txb, serial, i, n, now);
-               serial++;
-               txb = list_next_entry(txb, call_link);
-       }
+       call->tx_last_sent = req->now;
+       xmit_ts = rxrpc_prepare_txqueue(tq, req);
+       prefetch(tq->next);
 
-       /* Set timeouts */
-       if (call->peer->rtt_count > 1) {
-               ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
+       for (int i = 0;;) {
+               int ix = seq & RXRPC_TXQ_MASK;
+               struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];
 
-               call->ack_lost_at = ktime_add(now, delay);
-               trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_lost_ack);
+               _debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq);
+               tq->segment_xmit_ts[ix] = xmit_ts;
+               len += rxrpc_prepare_data_subpacket(call, req, txb, serial, i);
+               serial++;
+               seq++;
+               i++;
+               if (i >= req->n)
+                       break;
+               if (!(seq & RXRPC_TXQ_MASK)) {
+                       tq = tq->next;
+                       trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance);
+                       xmit_ts = rxrpc_prepare_txqueue(tq, req);
+               }
        }
 
+       /* Set timeouts */
        if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
                ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
 
-               call->expect_rx_by = ktime_add(now, delay);
+               call->expect_rx_by = ktime_add(req->now, delay);
                trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
        }
+       if (call->resend_at == KTIME_MAX) {
+               ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
+
+               call->resend_at = ktime_add(req->now, delay);
+               trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend);
+       }
 
-       rxrpc_set_keepalive(call, now);
+       rxrpc_set_keepalive(call, req->now);
        return len;
 }
 
 /*
- * send a packet through the transport endpoint
+ * Send one or more packets through the transport endpoint
  */
-static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n)
+void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
 {
        struct rxrpc_connection *conn = call->conn;
        enum rxrpc_tx_point frag;
+       struct rxrpc_txqueue *tq = req->tq;
+       struct rxrpc_txbuf *txb;
        struct msghdr msg;
+       rxrpc_seq_t seq = req->seq;
        size_t len;
        bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
        int ret;
 
-       _enter("%x,{%d}", txb->seq, txb->pkt_len);
+       _enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
 
-       len = rxrpc_prepare_data_packet(call, txb, n);
+       len = rxrpc_prepare_data_packet(call, req);
+       txb = tq->bufs[seq & RXRPC_TXQ_MASK];
 
-       iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, n, len);
+       iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len);
 
        msg.msg_name    = &call->peer->srx.transport;
        msg.msg_namelen = call->peer->srx.transport_len;
@@ -535,7 +570,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
        /* Send the packet with the don't fragment bit set unless we think it's
         * too big or if this is a retransmission.
         */
-       if (txb->seq == call->tx_transmitted + 1 &&
+       if (seq == call->tx_transmitted + 1 &&
            len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) {
                rxrpc_local_dont_fragment(conn->local, false);
                frag = rxrpc_tx_point_call_data_frag;
@@ -548,8 +583,8 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
         * retransmission algorithm doesn't try to resend what we haven't sent
         * yet.
         */
-       if (txb->seq == call->tx_transmitted + 1)
-               call->tx_transmitted = txb->seq + n - 1;
+       if (seq == call->tx_transmitted + 1)
+               call->tx_transmitted = seq + req->n - 1;
 
        if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
                static int lose;
@@ -586,19 +621,21 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
        rxrpc_tx_backoff(call, ret);
 
        if (ret < 0) {
-               /* Cancel the call if the initial transmission fails,
-                * particularly if that's due to network routing issues that
-                * aren't going away anytime soon.  The layer above can arrange
-                * the retransmission.
+               /* Cancel the call if the initial transmission fails or if we
+                * hit due to network routing issues that aren't going away
+                * anytime soon.  The layer above can arrange the
+                * retransmission.
                 */
-               if (new_call)
+               if (new_call ||
+                   ret == -ENETUNREACH ||
+                   ret == -EHOSTUNREACH ||
+                   ret == -ECONNREFUSED)
                        rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
                                                  RX_USER_ABORT, ret);
        }
 
 done:
        _leave(" = %d [%u]", ret, call->peer->max_data);
-       return ret;
 }
 
 /*
@@ -773,41 +810,3 @@ void rxrpc_send_keepalive(struct rxrpc_peer *peer)
        peer->last_tx_at = ktime_get_seconds();
        _leave("");
 }
-
-/*
- * Schedule an instant Tx resend.
- */
-static inline void rxrpc_instant_resend(struct rxrpc_call *call,
-                                       struct rxrpc_txbuf *txb)
-{
-       if (!__rxrpc_call_is_complete(call))
-               kdebug("resend");
-}
-
-/*
- * Transmit a packet, possibly gluing several subpackets together.
- */
-void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n)
-{
-       int ret;
-
-       ret = rxrpc_send_data_packet(call, txb, n);
-       if (ret < 0) {
-               switch (ret) {
-               case -ENETUNREACH:
-               case -EHOSTUNREACH:
-               case -ECONNREFUSED:
-                       rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
-                                                 0, ret);
-                       break;
-               default:
-                       _debug("need instant resend %d", ret);
-                       rxrpc_instant_resend(call, txb);
-               }
-       } else {
-               ktime_t delay = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
-
-               call->resend_at = ktime_add(ktime_get_real(), delay);
-               trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend_tx);
-       }
-}
index 467c9402882e67fe0188468bef107ce83501cf21..85b35b11755ddbde5c64918e6d527b38502f7117 100644 (file)
@@ -98,7 +98,7 @@ static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
 
        if (_tx_win)
                *_tx_win = tx_bottom;
-       return call->tx_prepared - tx_bottom < 256;
+       return call->send_top - tx_bottom < 256;
 }
 
 /*
@@ -242,36 +242,74 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
                               struct rxrpc_txbuf *txb,
                               rxrpc_notify_end_tx_t notify_end_tx)
 {
+       struct rxrpc_txqueue *sq = call->send_queue;
        rxrpc_seq_t seq = txb->seq;
        bool poke, last = txb->flags & RXRPC_LAST_PACKET;
-
+       int ix = seq & RXRPC_TXQ_MASK;
        rxrpc_inc_stat(call->rxnet, stat_tx_data);
 
-       ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
-
-       /* We have to set the timestamp before queueing as the retransmit
-        * algorithm can see the packet as soon as we queue it.
-        */
-       txb->last_sent = ktime_get_real();
+       ASSERTCMP(txb->seq, ==, call->send_top + 1);
 
        if (last)
                trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
        else
                trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
 
+       if (WARN_ON_ONCE(sq->bufs[ix]))
+               trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup);
+       else
+               trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue);
+
        /* Add the packet to the call's output buffer */
        spin_lock(&call->tx_lock);
-       poke = list_empty(&call->tx_sendmsg);
-       list_add_tail(&txb->call_link, &call->tx_sendmsg);
-       call->tx_prepared = seq;
-       if (last)
+       poke = (READ_ONCE(call->tx_bottom) == call->send_top);
+       sq->bufs[ix] = txb;
+       /* Order send_top after the queue->next pointer and txb content. */
+       smp_store_release(&call->send_top, seq);
+       if (last) {
                rxrpc_notify_end_tx(rx, call, notify_end_tx);
+               call->send_queue = NULL;
+       }
        spin_unlock(&call->tx_lock);
 
        if (poke)
                rxrpc_poke_call(call, rxrpc_call_poke_start);
 }
 
+/*
+ * Allocate a new txqueue unit and add it to the transmission queue.
+ */
+static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call)
+{
+       struct rxrpc_txqueue *tq;
+
+       tq = kzalloc(sizeof(*tq), sk->sk_allocation);
+       if (!tq)
+               return -ENOMEM;
+
+       tq->xmit_ts_base = KTIME_MIN;
+       for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+               tq->segment_xmit_ts[i] = UINT_MAX;
+
+       if (call->send_queue) {
+               tq->qbase = call->send_top + 1;
+               call->send_queue->next = tq;
+               call->send_queue = tq;
+       } else if (WARN_ON(call->tx_queue)) {
+               kfree(tq);
+               return -ENOMEM;
+       } else {
+               tq->qbase = 0;
+               call->tx_qbase = 0;
+               call->send_queue = tq;
+               call->tx_qtail = tq;
+               call->tx_queue = tq;
+       }
+
+       trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc);
+       return 0;
+}
+
 /*
  * send data through a socket
  * - must be called in process context
@@ -346,6 +384,13 @@ reload:
                        if (!rxrpc_check_tx_space(call, NULL))
                                goto wait_for_space;
 
+                       /* See if we need to begin/extend the Tx queue. */
+                       if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) {
+                               ret = rxrpc_alloc_txqueue(sk, call);
+                               if (ret < 0)
+                                       goto maybe_error;
+                       }
+
                        /* Work out the maximum size of a packet.  Assume that
                         * the security header is going to be in the padded
                         * region (enc blocksize), but the trailer is not.
index 0cc8f49d69a9e455d7cdbeb0fd8932e8fbe0878e..067223c8c35f2b1f51c4945345501ee872c2b549 100644 (file)
@@ -43,17 +43,14 @@ struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_
 
        whdr = buf + hoff;
 
-       INIT_LIST_HEAD(&txb->call_link);
-       INIT_LIST_HEAD(&txb->tx_link);
        refcount_set(&txb->ref, 1);
-       txb->last_sent          = KTIME_MIN;
        txb->call_debug_id      = call->debug_id;
        txb->debug_id           = atomic_inc_return(&rxrpc_txbuf_debug_ids);
        txb->alloc_size         = data_size;
        txb->space              = data_size;
        txb->offset             = sizeof(*whdr);
        txb->flags              = call->conn->out_clientflag;
-       txb->seq                = call->tx_prepared + 1;
+       txb->seq                = call->send_top + 1;
        txb->nr_kvec            = 1;
        txb->kvec[0].iov_base   = whdr;
        txb->kvec[0].iov_len    = sizeof(*whdr);
@@ -114,8 +111,6 @@ struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_s
        filler  = buf + sizeof(*whdr) + sizeof(*ack) + 1;
        trailer = buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
 
-       INIT_LIST_HEAD(&txb->call_link);
-       INIT_LIST_HEAD(&txb->tx_link);
        refcount_set(&txb->ref, 1);
        txb->call_debug_id      = call->debug_id;
        txb->debug_id           = atomic_inc_return(&rxrpc_txbuf_debug_ids);
@@ -200,37 +195,3 @@ void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
                        rxrpc_free_txbuf(txb);
        }
 }
-
-/*
- * Shrink the transmit buffer.
- */
-void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
-{
-       struct rxrpc_txbuf *txb;
-       rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
-       bool wake = false;
-
-       _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
-
-       while ((txb = list_first_entry_or_null(&call->tx_buffer,
-                                              struct rxrpc_txbuf, call_link))) {
-               hard_ack = call->acks_hard_ack;
-               if (before(hard_ack, txb->seq))
-                       break;
-
-               if (txb->seq != call->tx_bottom + 1)
-                       rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
-               ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
-               WRITE_ONCE(call->tx_bottom, call->tx_bottom + 1);
-               list_del_rcu(&txb->call_link);
-
-               trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
-
-               rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
-               if (after(call->acks_hard_ack, call->tx_bottom + 128))
-                       wake = true;
-       }
-
-       if (wake)
-               wake_up(&call->waitq);
-}