struct xsk_queue *tx ____cacheline_aligned_in_smp;
struct list_head tx_list;
+ /* record the number of tx descriptors sent by this xsk and
+ * when it exceeds MAX_PER_SOCKET_BUDGET, an opportunity needs
+ * to be given to other xsks for sending tx descriptors, thereby
+ * preventing other XSKs from being starved.
+ */
+ u32 tx_budget_spent;
+
/* Protects generic receive. */
spinlock_t rx_lock;
#include "xsk.h"
#define TX_BATCH_SIZE 32
+#define MAX_PER_SOCKET_BUDGET (TX_BATCH_SIZE)
static DEFINE_PER_CPU(struct list_head, xskmap_flush_list);
bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
{
+ bool budget_exhausted = false;
struct xdp_sock *xs;
rcu_read_lock();
+again:
list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
+ if (xs->tx_budget_spent >= MAX_PER_SOCKET_BUDGET) {
+ budget_exhausted = true;
+ continue;
+ }
+
if (!xskq_cons_peek_desc(xs->tx, desc, pool)) {
if (xskq_has_descs(xs->tx))
xskq_cons_release(xs->tx);
continue;
}
+ xs->tx_budget_spent++;
+
/* This is the backpressure mechanism for the Tx path.
* Reserve space in the completion queue and only proceed
* if there is space in it. This avoids having to implement
return true;
}
+ if (budget_exhausted) {
+ list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list)
+ xs->tx_budget_spent = 0;
+
+ budget_exhausted = false;
+ goto again;
+ }
+
out:
rcu_read_unlock();
return false;