}
 EXPORT_SYMBOL(xsk_tx_peek_desc);
 
+static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, struct xdp_desc *descs,
+                                       u32 max_entries)
+{
+       u32 nb_pkts = 0;
+
+       while (nb_pkts < max_entries && xsk_tx_peek_desc(pool, &descs[nb_pkts]))
+               nb_pkts++;
+
+       xsk_tx_release(pool);
+       return nb_pkts;
+}
+
+u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, struct xdp_desc *descs,
+                                  u32 max_entries)
+{
+       struct xdp_sock *xs;
+       u32 nb_pkts;
+
+       rcu_read_lock();
+       if (!list_is_singular(&pool->xsk_tx_list)) {
+               /* Fallback to the non-batched version */
+               rcu_read_unlock();
+               return xsk_tx_peek_release_fallback(pool, descs, max_entries);
+       }
+
+       xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
+       if (!xs) {
+               nb_pkts = 0;
+               goto out;
+       }
+
+       nb_pkts = xskq_cons_peek_desc_batch(xs->tx, descs, pool, max_entries);
+       if (!nb_pkts) {
+               xs->tx->queue_empty_descs++;
+               goto out;
+       }
+
+       /* This is the backpressure mechanism for the Tx path. Try to
+        * reserve space in the completion queue for all packets, but
+        * if there are fewer slots available, just process that many
+        * packets. This avoids having to implement any buffering in
+        * the Tx path.
+        */
+       nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, descs, nb_pkts);
+       if (!nb_pkts)
+               goto out;
+
+       xskq_cons_release_n(xs->tx, nb_pkts);
+       __xskq_cons_release(xs->tx);
+       xs->sk.sk_write_space(&xs->sk);
+
+out:
+       rcu_read_unlock();
+       return nb_pkts;
+}
+EXPORT_SYMBOL(xsk_tx_peek_release_desc_batch);
+
 static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
 {
        struct net_device *dev = xs->dev;
 
        return false;
 }
 
+static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q,
+                                           struct xdp_desc *descs,
+                                           struct xsk_buff_pool *pool, u32 max)
+{
+       u32 cached_cons = q->cached_cons, nb_entries = 0;
+
+       while (cached_cons != q->cached_prod && nb_entries < max) {
+               struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
+               u32 idx = cached_cons & q->ring_mask;
+
+               descs[nb_entries] = ring->desc[idx];
+               if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
+                       /* Skip the entry */
+                       cached_cons++;
+                       continue;
+               }
+
+               nb_entries++;
+               cached_cons++;
+       }
+
+       return nb_entries;
+}
+
 /* Functions for consumers */
 
 static inline void __xskq_cons_release(struct xsk_queue *q)
        __xskq_cons_peek(q);
 }
 
-static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+static inline u32 xskq_cons_nb_entries(struct xsk_queue *q, u32 max)
 {
        u32 entries = q->cached_prod - q->cached_cons;
 
-       if (entries >= cnt)
-               return true;
+       if (entries >= max)
+               return max;
 
        __xskq_cons_peek(q);
        entries = q->cached_prod - q->cached_cons;
 
-       return entries >= cnt;
+       return entries >= max ? max : entries;
+}
+
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+{
+       return xskq_cons_nb_entries(q, cnt) >= cnt ? true : false;
 }
 
 static inline bool xskq_cons_peek_addr_unchecked(struct xsk_queue *q, u64 *addr)
        return xskq_cons_read_desc(q, desc, pool);
 }
 
+static inline u32 xskq_cons_peek_desc_batch(struct xsk_queue *q, struct xdp_desc *descs,
+                                           struct xsk_buff_pool *pool, u32 max)
+{
+       u32 entries = xskq_cons_nb_entries(q, max);
+
+       return xskq_cons_read_desc_batch(q, descs, pool, entries);
+}
+
+/* To improve performance in the xskq_cons_release functions, only update local state here.
+ * Reflect this to global state when we get new entries from the ring in
+ * xskq_cons_get_entries() and whenever Rx or Tx processing are completed in the NAPI loop.
+ */
 static inline void xskq_cons_release(struct xsk_queue *q)
 {
-       /* To improve performance, only update local state here.
-        * Reflect this to global state when we get new entries
-        * from the ring in xskq_cons_get_entries() and whenever
-        * Rx or Tx processing are completed in the NAPI loop.
-        */
        q->cached_cons++;
 }
 
+static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
+{
+       q->cached_cons += cnt;
+}
+
 static inline bool xskq_cons_is_full(struct xsk_queue *q)
 {
        /* No barriers needed since data is not accessed */
 
 /* Functions for producers */
 
-static inline bool xskq_prod_is_full(struct xsk_queue *q)
+static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
 {
        u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
 
-       if (free_entries)
-               return false;
+       if (free_entries >= max)
+               return max;
 
        /* Refresh the local tail pointer */
        q->cached_cons = READ_ONCE(q->ring->consumer);
        free_entries = q->nentries - (q->cached_prod - q->cached_cons);
 
-       return !free_entries;
+       return free_entries >= max ? max : free_entries;
+}
+
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
+{
+       return xskq_prod_nb_free(q, 1) ? false : true;
 }
 
 static inline int xskq_prod_reserve(struct xsk_queue *q)
        return 0;
 }
 
+static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
+                                              u32 max)
+{
+       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+       u32 nb_entries, i, cached_prod;
+
+       nb_entries = xskq_prod_nb_free(q, max);
+
+       /* A, matches D */
+       cached_prod = q->cached_prod;
+       for (i = 0; i < nb_entries; i++)
+               ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
+       q->cached_prod = cached_prod;
+
+       return nb_entries;
+}
+
 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
                                         u64 addr, u32 len)
 {