struct net_device       *dev;
        struct gro_list         gro_hash[GRO_HASH_BUCKETS];
        struct sk_buff          *skb;
+       struct list_head        rx_list; /* Pending GRO_NORMAL skbs */
+       int                     rx_count; /* length of rx_list */
        struct hrtimer          timer;
        struct list_head        dev_list;
        struct hlist_node       napi_hash_node;
 extern int             dev_weight_tx_bias;
 extern int             dev_rx_weight;
 extern int             dev_tx_weight;
+extern int             gro_normal_batch;
 
 bool netdev_has_upper_dev(struct net_device *dev, struct net_device *upper_dev);
 struct net_device *netdev_upper_get_next_dev_rcu(struct net_device *dev,
 
 int dev_weight_tx_bias __read_mostly = 1;  /* bias for output_queue quota */
 int dev_rx_weight __read_mostly = 64;
 int dev_tx_weight __read_mostly = 64;
+/* Maximum number of GRO_NORMAL skbs to batch up for list-RX */
+int gro_normal_batch __read_mostly = 8;
 
 /* Called with irq disabled */
 static inline void ____napi_schedule(struct softnet_data *sd,
 }
 EXPORT_SYMBOL(napi_get_frags);
 
+/* Pass the currently batched GRO_NORMAL SKBs up to the stack. */
+static void gro_normal_list(struct napi_struct *napi)
+{
+       if (!napi->rx_count)
+               return;
+       netif_receive_skb_list_internal(&napi->rx_list);
+       INIT_LIST_HEAD(&napi->rx_list);
+       napi->rx_count = 0;
+}
+
+/* Queue one GRO_NORMAL SKB up for list processing.  If batch size exceeded,
+ * pass the whole batch up to the stack.
+ */
+static void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb)
+{
+       list_add_tail(&skb->list, &napi->rx_list);
+       if (++napi->rx_count >= gro_normal_batch)
+               gro_normal_list(napi);
+}
+
 static gro_result_t napi_frags_finish(struct napi_struct *napi,
                                      struct sk_buff *skb,
                                      gro_result_t ret)
        case GRO_HELD:
                __skb_push(skb, ETH_HLEN);
                skb->protocol = eth_type_trans(skb, skb->dev);
-               if (ret == GRO_NORMAL && netif_receive_skb_internal(skb))
-                       ret = GRO_DROP;
+               if (ret == GRO_NORMAL)
+                       gro_normal_one(napi, skb);
                break;
 
        case GRO_DROP:
                                 NAPIF_STATE_IN_BUSY_POLL)))
                return false;
 
+       gro_normal_list(n);
+
        if (n->gro_bitmask) {
                unsigned long timeout = 0;
 
         * Ideally, a new ndo_busy_poll_stop() could avoid another round.
         */
        rc = napi->poll(napi, BUSY_POLL_BUDGET);
+       /* We can't gro_normal_list() here, because napi->poll() might have
+        * rearmed the napi (napi_complete_done()) in which case it could
+        * already be running on another CPU.
+        */
        trace_napi_poll(napi, rc, BUSY_POLL_BUDGET);
        netpoll_poll_unlock(have_poll_lock);
-       if (rc == BUSY_POLL_BUDGET)
+       if (rc == BUSY_POLL_BUDGET) {
+               /* As the whole budget was spent, we still own the napi so can
+                * safely handle the rx_list.
+                */
+               gro_normal_list(napi);
                __napi_schedule(napi);
+       }
        local_bh_enable();
 }
 
                }
                work = napi_poll(napi, BUSY_POLL_BUDGET);
                trace_napi_poll(napi, work, BUSY_POLL_BUDGET);
+               gro_normal_list(napi);
 count:
                if (work > 0)
                        __NET_ADD_STATS(dev_net(napi->dev),
        napi->timer.function = napi_watchdog;
        init_gro_hash(napi);
        napi->skb = NULL;
+       INIT_LIST_HEAD(&napi->rx_list);
+       napi->rx_count = 0;
        napi->poll = poll;
        if (weight > NAPI_POLL_WEIGHT)
                netdev_err_once(dev, "%s() called with weight %d\n", __func__,
                goto out_unlock;
        }
 
+       gro_normal_list(n);
+
        if (n->gro_bitmask) {
                /* flush too old packets
                 * If HZ < 1000, flush all packets.