#include <linux/slab.h>
 #include <linux/sched.h>
 #include <linux/sched/mm.h>
+#include <linux/smpboot.h>
 #include <linux/mutex.h>
 #include <linux/rwsem.h>
 #include <linux/string.h>
        return &net->dev_index_head[ifindex & (NETDEV_HASHENTRIES - 1)];
 }
 
+#ifndef CONFIG_PREEMPT_RT
+
+static DEFINE_STATIC_KEY_FALSE(use_backlog_threads_key);
+
+static int __init setup_backlog_napi_threads(char *arg)
+{
+       static_branch_enable(&use_backlog_threads_key);
+       return 0;
+}
+early_param("thread_backlog_napi", setup_backlog_napi_threads);
+
+static bool use_backlog_threads(void)
+{
+       return static_branch_unlikely(&use_backlog_threads_key);
+}
+
+#else
+
+static bool use_backlog_threads(void)
+{
+       return true;
+}
+
+#endif
+
 static inline void rps_lock_irqsave(struct softnet_data *sd,
                                    unsigned long *flags)
 {
 /*************************************************************************
  *                     Receiver routines
  *************************************************************************/
+static DEFINE_PER_CPU(struct task_struct *, backlog_napi);
 
 unsigned int sysctl_skb_defer_max __read_mostly = 64;
 int weight_p __read_mostly = 64;           /* old backlog weight */
                 */
                thread = READ_ONCE(napi->thread);
                if (thread) {
+                       if (use_backlog_threads() && thread == raw_cpu_read(backlog_napi))
+                               goto use_local_napi;
+
                        set_bit(NAPI_STATE_SCHED_THREADED, &napi->state);
                        wake_up_process(thread);
                        return;
                }
        }
 
+use_local_napi:
        list_add_tail(&napi->poll_list, &sd->poll_list);
        WRITE_ONCE(napi->list_owner, smp_processor_id());
        /* If not called from net_rx_action()
 
 #ifdef CONFIG_RPS
        if (sd != mysd) {
+               if (use_backlog_threads()) {
+                       __napi_schedule_irqoff(&sd->backlog);
+                       return;
+               }
+
                sd->rps_ipi_next = mysd->rps_ipi_list;
                mysd->rps_ipi_list = sd;
 
 #ifdef CONFIG_RPS
        struct softnet_data *remsd = sd->rps_ipi_list;
 
-       if (remsd) {
+       if (!use_backlog_threads() && remsd) {
                sd->rps_ipi_list = NULL;
 
                local_irq_enable();
 static bool sd_has_rps_ipi_waiting(struct softnet_data *sd)
 {
 #ifdef CONFIG_RPS
-       return sd->rps_ipi_list != NULL;
+       return !use_backlog_threads() && sd->rps_ipi_list;
 #else
        return false;
 #endif
                         * We can use a plain write instead of clear_bit(),
                         * and we dont need an smp_mb() memory barrier.
                         */
-                       napi->state = 0;
+                       napi->state &= NAPIF_STATE_THREADED;
                        again = false;
                } else {
                        skb_queue_splice_tail_init(&sd->input_pkt_queue,
        return -1;
 }
 
-static int napi_threaded_poll(void *data)
+static void napi_threaded_poll_loop(struct napi_struct *napi)
 {
-       struct napi_struct *napi = data;
        struct softnet_data *sd;
-       void *have;
-
-       while (!napi_thread_wait(napi)) {
-               unsigned long last_qs = jiffies;
+       unsigned long last_qs = jiffies;
 
-               for (;;) {
-                       bool repoll = false;
+       for (;;) {
+               bool repoll = false;
+               void *have;
 
-                       local_bh_disable();
-                       sd = this_cpu_ptr(&softnet_data);
-                       sd->in_napi_threaded_poll = true;
+               local_bh_disable();
+               sd = this_cpu_ptr(&softnet_data);
+               sd->in_napi_threaded_poll = true;
 
-                       have = netpoll_poll_lock(napi);
-                       __napi_poll(napi, &repoll);
-                       netpoll_poll_unlock(have);
+               have = netpoll_poll_lock(napi);
+               __napi_poll(napi, &repoll);
+               netpoll_poll_unlock(have);
 
-                       sd->in_napi_threaded_poll = false;
-                       barrier();
+               sd->in_napi_threaded_poll = false;
+               barrier();
 
-                       if (sd_has_rps_ipi_waiting(sd)) {
-                               local_irq_disable();
-                               net_rps_action_and_irq_enable(sd);
-                       }
-                       skb_defer_free_flush(sd);
-                       local_bh_enable();
+               if (sd_has_rps_ipi_waiting(sd)) {
+                       local_irq_disable();
+                       net_rps_action_and_irq_enable(sd);
+               }
+               skb_defer_free_flush(sd);
+               local_bh_enable();
 
-                       if (!repoll)
-                               break;
+               if (!repoll)
+                       break;
 
-                       rcu_softirq_qs_periodic(last_qs);
-                       cond_resched();
-               }
+               rcu_softirq_qs_periodic(last_qs);
+               cond_resched();
        }
+}
+
+static int napi_threaded_poll(void *data)
+{
+       struct napi_struct *napi = data;
+
+       while (!napi_thread_wait(napi))
+               napi_threaded_poll_loop(napi);
+
        return 0;
 }
 
 
                list_del_init(&napi->poll_list);
                if (napi->poll == process_backlog)
-                       napi->state = 0;
+                       napi->state &= NAPIF_STATE_THREADED;
                else
                        ____napi_schedule(sd, napi);
        }
        raise_softirq_irqoff(NET_TX_SOFTIRQ);
        local_irq_enable();
 
+       if (!use_backlog_threads()) {
 #ifdef CONFIG_RPS
-       remsd = oldsd->rps_ipi_list;
-       oldsd->rps_ipi_list = NULL;
+               remsd = oldsd->rps_ipi_list;
+               oldsd->rps_ipi_list = NULL;
 #endif
-       /* send out pending IPI's on offline CPU */
-       net_rps_send_ipi(remsd);
+               /* send out pending IPI's on offline CPU */
+               net_rps_send_ipi(remsd);
+       }
 
        /* Process offline CPU's input_pkt_queue */
        while ((skb = __skb_dequeue(&oldsd->process_queue))) {
        return 0;
 }
 
+static int backlog_napi_should_run(unsigned int cpu)
+{
+       struct softnet_data *sd = per_cpu_ptr(&softnet_data, cpu);
+       struct napi_struct *napi = &sd->backlog;
+
+       return test_bit(NAPI_STATE_SCHED_THREADED, &napi->state);
+}
+
+static void run_backlog_napi(unsigned int cpu)
+{
+       struct softnet_data *sd = per_cpu_ptr(&softnet_data, cpu);
+
+       napi_threaded_poll_loop(&sd->backlog);
+}
+
+static void backlog_napi_setup(unsigned int cpu)
+{
+       struct softnet_data *sd = per_cpu_ptr(&softnet_data, cpu);
+       struct napi_struct *napi = &sd->backlog;
+
+       napi->thread = this_cpu_read(backlog_napi);
+       set_bit(NAPI_STATE_THREADED, &napi->state);
+}
+
+static struct smp_hotplug_thread backlog_threads = {
+       .store                  = &backlog_napi,
+       .thread_should_run      = backlog_napi_should_run,
+       .thread_fn              = run_backlog_napi,
+       .thread_comm            = "backlog_napi/%u",
+       .setup                  = backlog_napi_setup,
+};
+
 /*
  *       This is called single threaded during boot, so no need
  *       to take the rtnl semaphore.
                init_gro_hash(&sd->backlog);
                sd->backlog.poll = process_backlog;
                sd->backlog.weight = weight_p;
+               INIT_LIST_HEAD(&sd->backlog.poll_list);
 
                if (net_page_pool_create(i))
                        goto out;
        }
+       if (use_backlog_threads())
+               smpboot_register_percpu_thread(&backlog_threads);
 
        dev_boot_phase = 0;