};
        u16 rx_offset;
        struct xdp_rxq_info xdp_rxq;
+       spinlock_t tx_lock;     /* used in XDP mode */
        struct xsk_buff_pool *xsk_pool;
        u16 ring_idx;           /* {rx,tx,xdp}_ring back reference idx */
        u16 rx_buf_len;
 #define IXGBE_MAX_FCOE_INDICES         8
 #define MAX_RX_QUEUES                  (IXGBE_MAX_FDIR_INDICES + 1)
 #define MAX_TX_QUEUES                  (IXGBE_MAX_FDIR_INDICES + 1)
-#define MAX_XDP_QUEUES                 (IXGBE_MAX_FDIR_INDICES + 1)
+#define IXGBE_MAX_XDP_QS               (IXGBE_MAX_FDIR_INDICES + 1)
 #define IXGBE_MAX_L2A_QUEUES           4
 #define IXGBE_BAD_L2A_QUEUE            3
 #define IXGBE_MAX_MACVLANS             63
 
+DECLARE_STATIC_KEY_FALSE(ixgbe_xdp_locking_key);
+
 struct ixgbe_ring_feature {
        u16 limit;      /* upper limit on feature indices */
        u16 indices;    /* current value of indices */
 
        /* XDP */
        int num_xdp_queues;
-       struct ixgbe_ring *xdp_ring[MAX_XDP_QUEUES];
+       struct ixgbe_ring *xdp_ring[IXGBE_MAX_XDP_QS];
        unsigned long *af_xdp_zc_qps; /* tracks AF_XDP ZC enabled rings */
 
        /* TX */
 #endif /* CONFIG_IXGBE_IPSEC */
 };
 
+static inline int ixgbe_determine_xdp_q_idx(int cpu)
+{
+       if (static_key_enabled(&ixgbe_xdp_locking_key))
+               return cpu % IXGBE_MAX_XDP_QS;
+       else
+               return cpu;
+}
+
+static inline
+struct ixgbe_ring *ixgbe_determine_xdp_ring(struct ixgbe_adapter *adapter)
+{
+       int index = ixgbe_determine_xdp_q_idx(smp_processor_id());
+
+       return adapter->xdp_ring[index];
+}
+
 static inline u8 ixgbe_max_rss_indices(struct ixgbe_adapter *adapter)
 {
        switch (adapter->hw.mac.type) {
 
 
 static int ixgbe_xdp_queues(struct ixgbe_adapter *adapter)
 {
-       return adapter->xdp_prog ? nr_cpu_ids : 0;
+       int queues;
+
+       queues = min_t(int, IXGBE_MAX_XDP_QS, nr_cpu_ids);
+       return adapter->xdp_prog ? queues : 0;
 }
 
 #define IXGBE_RSS_64Q_MASK     0x3F
                ring->count = adapter->tx_ring_count;
                ring->queue_index = xdp_idx;
                set_ring_xdp(ring);
+               spin_lock_init(&ring->tx_lock);
 
                /* assign ring to adapter */
                WRITE_ONCE(adapter->xdp_ring[xdp_idx], ring);
        adapter->q_vector[v_idx] = NULL;
        __netif_napi_del(&q_vector->napi);
 
+       if (static_key_enabled(&ixgbe_xdp_locking_key))
+               static_branch_dec(&ixgbe_xdp_locking_key);
+
        /*
         * after a call to __netif_napi_del() napi may still be used and
         * ixgbe_get_stats64() might access the rings on this vector,
 
 MODULE_DESCRIPTION("Intel(R) 10 Gigabit PCI Express Network Driver");
 MODULE_LICENSE("GPL v2");
 
+DEFINE_STATIC_KEY_FALSE(ixgbe_xdp_locking_key);
+EXPORT_SYMBOL(ixgbe_xdp_locking_key);
+
 static struct workqueue_struct *ixgbe_wq;
 
 static bool ixgbe_check_cfg_remove(struct ixgbe_hw *hw, struct pci_dev *pdev);
 {
        int err, result = IXGBE_XDP_PASS;
        struct bpf_prog *xdp_prog;
+       struct ixgbe_ring *ring;
        struct xdp_frame *xdpf;
        u32 act;
 
                xdpf = xdp_convert_buff_to_frame(xdp);
                if (unlikely(!xdpf))
                        goto out_failure;
-               result = ixgbe_xmit_xdp_ring(adapter, xdpf);
+               ring = ixgbe_determine_xdp_ring(adapter);
+               if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+                       spin_lock(&ring->tx_lock);
+               result = ixgbe_xmit_xdp_ring(ring, xdpf);
+               if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+                       spin_unlock(&ring->tx_lock);
                if (result == IXGBE_XDP_CONSUMED)
                        goto out_failure;
                break;
                xdp_do_flush_map();
 
        if (xdp_xmit & IXGBE_XDP_TX) {
-               struct ixgbe_ring *ring = adapter->xdp_ring[smp_processor_id()];
+               struct ixgbe_ring *ring = ixgbe_determine_xdp_ring(adapter);
 
-               /* Force memory writes to complete before letting h/w
-                * know there are new descriptors to fetch.
-                */
-               wmb();
-               writel(ring->next_to_use, ring->tail);
+               ixgbe_xdp_ring_update_tail_locked(ring);
        }
 
        u64_stats_update_begin(&rx_ring->syncp);
        if (ixgbe_init_rss_key(adapter))
                return -ENOMEM;
 
-       adapter->af_xdp_zc_qps = bitmap_zalloc(MAX_XDP_QUEUES, GFP_KERNEL);
+       adapter->af_xdp_zc_qps = bitmap_zalloc(IXGBE_MAX_XDP_QS, GFP_KERNEL);
        if (!adapter->af_xdp_zc_qps)
                return -ENOMEM;
 
 }
 
 #endif
-int ixgbe_xmit_xdp_ring(struct ixgbe_adapter *adapter,
+int ixgbe_xmit_xdp_ring(struct ixgbe_ring *ring,
                        struct xdp_frame *xdpf)
 {
-       struct ixgbe_ring *ring = adapter->xdp_ring[smp_processor_id()];
        struct ixgbe_tx_buffer *tx_buffer;
        union ixgbe_adv_tx_desc *tx_desc;
        u32 len, cmd_type;
                        return -EINVAL;
        }
 
-       if (nr_cpu_ids > MAX_XDP_QUEUES)
+       /* if the number of cpus is much larger than the maximum of queues,
+        * we should stop it and then return with ENOMEM like before.
+        */
+       if (nr_cpu_ids > IXGBE_MAX_XDP_QS * 2)
                return -ENOMEM;
+       else if (nr_cpu_ids > IXGBE_MAX_XDP_QS)
+               static_branch_inc(&ixgbe_xdp_locking_key);
 
        old_prog = xchg(&adapter->xdp_prog, prog);
        need_reset = (!!prog != !!old_prog);
        writel(ring->next_to_use, ring->tail);
 }
 
+void ixgbe_xdp_ring_update_tail_locked(struct ixgbe_ring *ring)
+{
+       if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+               spin_lock(&ring->tx_lock);
+       ixgbe_xdp_ring_update_tail(ring);
+       if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+               spin_unlock(&ring->tx_lock);
+}
+
 static int ixgbe_xdp_xmit(struct net_device *dev, int n,
                          struct xdp_frame **frames, u32 flags)
 {
        /* During program transitions its possible adapter->xdp_prog is assigned
         * but ring has not been configured yet. In this case simply abort xmit.
         */
-       ring = adapter->xdp_prog ? adapter->xdp_ring[smp_processor_id()] : NULL;
+       ring = adapter->xdp_prog ? ixgbe_determine_xdp_ring(adapter) : NULL;
        if (unlikely(!ring))
                return -ENXIO;
 
        if (unlikely(test_bit(__IXGBE_TX_DISABLED, &ring->state)))
                return -ENXIO;
 
+       if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+               spin_lock(&ring->tx_lock);
+
        for (i = 0; i < n; i++) {
                struct xdp_frame *xdpf = frames[i];
                int err;
 
-               err = ixgbe_xmit_xdp_ring(adapter, xdpf);
+               err = ixgbe_xmit_xdp_ring(ring, xdpf);
                if (err != IXGBE_XDP_TX)
                        break;
                nxmit++;
        if (unlikely(flags & XDP_XMIT_FLUSH))
                ixgbe_xdp_ring_update_tail(ring);
 
+       if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+               spin_unlock(&ring->tx_lock);
+
        return nxmit;
 }
 
 
 #define IXGBE_TXD_CMD (IXGBE_TXD_CMD_EOP | \
                       IXGBE_TXD_CMD_RS)
 
-int ixgbe_xmit_xdp_ring(struct ixgbe_adapter *adapter,
+int ixgbe_xmit_xdp_ring(struct ixgbe_ring *ring,
                        struct xdp_frame *xdpf);
 bool ixgbe_cleanup_headers(struct ixgbe_ring *rx_ring,
                           union ixgbe_adv_rx_desc *rx_desc,
 void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector,
                  struct sk_buff *skb);
 void ixgbe_xdp_ring_update_tail(struct ixgbe_ring *ring);
+void ixgbe_xdp_ring_update_tail_locked(struct ixgbe_ring *ring);
 void ixgbe_irq_rearm_queues(struct ixgbe_adapter *adapter, u64 qmask);
 
 void ixgbe_txrx_ring_disable(struct ixgbe_adapter *adapter, int ring);
 
 {
        int err, result = IXGBE_XDP_PASS;
        struct bpf_prog *xdp_prog;
+       struct ixgbe_ring *ring;
        struct xdp_frame *xdpf;
        u32 act;
 
                xdpf = xdp_convert_buff_to_frame(xdp);
                if (unlikely(!xdpf))
                        goto out_failure;
-               result = ixgbe_xmit_xdp_ring(adapter, xdpf);
+               ring = ixgbe_determine_xdp_ring(adapter);
+               if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+                       spin_lock(&ring->tx_lock);
+               result = ixgbe_xmit_xdp_ring(ring, xdpf);
+               if (static_branch_unlikely(&ixgbe_xdp_locking_key))
+                       spin_unlock(&ring->tx_lock);
                if (result == IXGBE_XDP_CONSUMED)
                        goto out_failure;
                break;
                xdp_do_flush_map();
 
        if (xdp_xmit & IXGBE_XDP_TX) {
-               struct ixgbe_ring *ring = adapter->xdp_ring[smp_processor_id()];
+               struct ixgbe_ring *ring = ixgbe_determine_xdp_ring(adapter);
 
-               /* Force memory writes to complete before letting h/w
-                * know there are new descriptors to fetch.
-                */
-               wmb();
-               writel(ring->next_to_use, ring->tail);
+               ixgbe_xdp_ring_update_tail_locked(ring);
        }
 
        u64_stats_update_begin(&rx_ring->syncp);