struct rcu_head **nocb_tail;
        atomic_long_t nocb_q_count;     /* # CBs waiting for kthread */
        atomic_long_t nocb_q_count_lazy; /*  (approximate). */
+       struct rcu_head *nocb_follower_head; /* CBs ready to invoke. */
+       struct rcu_head **nocb_follower_tail;
+       atomic_long_t nocb_follower_count; /* # CBs ready to invoke. */
+       atomic_long_t nocb_follower_count_lazy; /*  (approximate). */
        int nocb_p_count;               /* # CBs being invoked by kthread */
        int nocb_p_count_lazy;          /*  (approximate). */
        wait_queue_head_t nocb_wq;      /* For nocb kthreads to sleep on. */
        struct task_struct *nocb_kthread;
        bool nocb_defer_wakeup;         /* Defer wakeup of nocb_kthread. */
+
+       /* The following fields are used by the leader, hence own cacheline. */
+       struct rcu_head *nocb_gp_head ____cacheline_internodealigned_in_smp;
+                                       /* CBs waiting for GP. */
+       struct rcu_head **nocb_gp_tail;
+       long nocb_gp_count;
+       long nocb_gp_count_lazy;
+       bool nocb_leader_wake;          /* Is the nocb leader thread awake? */
+       struct rcu_data *nocb_next_follower;
+                                       /* Next follower in wakeup chain. */
+
+       /* The following fields are used by the follower, hence new cachline. */
+       struct rcu_data *nocb_leader ____cacheline_internodealigned_in_smp;
+                                       /* Leader CPU takes GP-end wakeups. */
 #endif /* #ifdef CONFIG_RCU_NOCB_CPU */
 
        /* 8) RCU CPU stall data. */
 /* Sum up queue lengths for tracing. */
 static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
 {
-       *ql = atomic_long_read(&rdp->nocb_q_count) + rdp->nocb_p_count;
-       *qll = atomic_long_read(&rdp->nocb_q_count_lazy) + rdp->nocb_p_count_lazy;
+       *ql = atomic_long_read(&rdp->nocb_q_count) +
+             rdp->nocb_p_count +
+             atomic_long_read(&rdp->nocb_follower_count) +
+             rdp->nocb_p_count + rdp->nocb_gp_count;
+       *qll = atomic_long_read(&rdp->nocb_q_count_lazy) +
+              rdp->nocb_p_count_lazy +
+              atomic_long_read(&rdp->nocb_follower_count_lazy) +
+              rdp->nocb_p_count_lazy + rdp->nocb_gp_count_lazy;
 }
 #else /* #ifdef CONFIG_RCU_NOCB_CPU */
 static inline void rcu_nocb_q_lengths(struct rcu_data *rdp, long *ql, long *qll)
 
 }
 #endif /* #ifndef CONFIG_RCU_NOCB_CPU_ALL */
 
+/*
+ * Kick the leader kthread for this NOCB group.
+ */
+static void wake_nocb_leader(struct rcu_data *rdp, bool force)
+{
+       struct rcu_data *rdp_leader = rdp->nocb_leader;
+
+       if (!ACCESS_ONCE(rdp_leader->nocb_kthread))
+               return;
+       if (!ACCESS_ONCE(rdp_leader->nocb_leader_wake) || force) {
+               /* Prior xchg orders against prior callback enqueue. */
+               ACCESS_ONCE(rdp_leader->nocb_leader_wake) = true;
+               wake_up(&rdp_leader->nocb_wq);
+       }
+}
+
 /*
  * Enqueue the specified string of rcu_head structures onto the specified
  * CPU's no-CBs lists.  The CPU is specified by rdp, the head of the
        len = atomic_long_read(&rdp->nocb_q_count);
        if (old_rhpp == &rdp->nocb_head) {
                if (!irqs_disabled_flags(flags)) {
-                       wake_up(&rdp->nocb_wq); /* ... if queue was empty ... */
+                       /* ... if queue was empty ... */
+                       wake_nocb_leader(rdp, false);
                        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                            TPS("WakeEmpty"));
                } else {
                }
                rdp->qlen_last_fqs_check = 0;
        } else if (len > rdp->qlen_last_fqs_check + qhimark) {
-               wake_up_process(t); /* ... or if many callbacks queued. */
+               /* ... or if many callbacks queued. */
+               wake_nocb_leader(rdp, true);
                rdp->qlen_last_fqs_check = LONG_MAX / 2;
                trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WakeOvf"));
        } else {
        smp_mb(); /* Ensure that CB invocation happens after GP end. */
 }
 
+/*
+ * Leaders come here to wait for additional callbacks to show up.
+ * This function does not return until callbacks appear.
+ */
+static void nocb_leader_wait(struct rcu_data *my_rdp)
+{
+       bool firsttime = true;
+       bool gotcbs;
+       struct rcu_data *rdp;
+       struct rcu_head **tail;
+
+wait_again:
+
+       /* Wait for callbacks to appear. */
+       if (!rcu_nocb_poll) {
+               trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
+               wait_event_interruptible(my_rdp->nocb_wq,
+                                        ACCESS_ONCE(my_rdp->nocb_leader_wake));
+               /* Memory barrier handled by smp_mb() calls below and repoll. */
+       } else if (firsttime) {
+               firsttime = false; /* Don't drown trace log with "Poll"! */
+               trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
+       }
+
+       /*
+        * Each pass through the following loop checks a follower for CBs.
+        * We are our own first follower.  Any CBs found are moved to
+        * nocb_gp_head, where they await a grace period.
+        */
+       gotcbs = false;
+       for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
+               rdp->nocb_gp_head = ACCESS_ONCE(rdp->nocb_head);
+               if (!rdp->nocb_gp_head)
+                       continue;  /* No CBs here, try next follower. */
+
+               /* Move callbacks to wait-for-GP list, which is empty. */
+               ACCESS_ONCE(rdp->nocb_head) = NULL;
+               rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
+               rdp->nocb_gp_count = atomic_long_xchg(&rdp->nocb_q_count, 0);
+               rdp->nocb_gp_count_lazy =
+                       atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
+               gotcbs = true;
+       }
+
+       /*
+        * If there were no callbacks, sleep a bit, rescan after a
+        * memory barrier, and go retry.
+        */
+       if (unlikely(!gotcbs)) {
+               if (!rcu_nocb_poll)
+                       trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
+                                           "WokeEmpty");
+               flush_signals(current);
+               schedule_timeout_interruptible(1);
+
+               /* Rescan in case we were a victim of memory ordering. */
+               my_rdp->nocb_leader_wake = false;
+               smp_mb();  /* Ensure _wake false before scan. */
+               for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
+                       if (ACCESS_ONCE(rdp->nocb_head)) {
+                               /* Found CB, so short-circuit next wait. */
+                               my_rdp->nocb_leader_wake = true;
+                               break;
+                       }
+               goto wait_again;
+       }
+
+       /* Wait for one grace period. */
+       rcu_nocb_wait_gp(my_rdp);
+
+       /*
+        * We left ->nocb_leader_wake set to reduce cache thrashing.
+        * We clear it now, but recheck for new callbacks while
+        * traversing our follower list.
+        */
+       my_rdp->nocb_leader_wake = false;
+       smp_mb(); /* Ensure _wake false before scan of ->nocb_head. */
+
+       /* Each pass through the following loop wakes a follower, if needed. */
+       for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
+               if (ACCESS_ONCE(rdp->nocb_head))
+                       my_rdp->nocb_leader_wake = true; /* No need to wait. */
+               if (!rdp->nocb_gp_head)
+                       continue; /* No CBs, so no need to wake follower. */
+
+               /* Append callbacks to follower's "done" list. */
+               tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
+               *tail = rdp->nocb_gp_head;
+               atomic_long_add(rdp->nocb_gp_count, &rdp->nocb_follower_count);
+               atomic_long_add(rdp->nocb_gp_count_lazy,
+                               &rdp->nocb_follower_count_lazy);
+               if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
+                       /*
+                        * List was empty, wake up the follower.
+                        * Memory barriers supplied by atomic_long_add().
+                        */
+                       wake_up(&rdp->nocb_wq);
+               }
+       }
+
+       /* If we (the leader) don't have CBs, go wait some more. */
+       if (!my_rdp->nocb_follower_head)
+               goto wait_again;
+}
+
+/*
+ * Followers come here to wait for additional callbacks to show up.
+ * This function does not return until callbacks appear.
+ */
+static void nocb_follower_wait(struct rcu_data *rdp)
+{
+       bool firsttime = true;
+
+       for (;;) {
+               if (!rcu_nocb_poll) {
+                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
+                                           "FollowerSleep");
+                       wait_event_interruptible(rdp->nocb_wq,
+                                                ACCESS_ONCE(rdp->nocb_follower_head));
+               } else if (firsttime) {
+                       /* Don't drown trace log with "Poll"! */
+                       firsttime = false;
+                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
+               }
+               if (smp_load_acquire(&rdp->nocb_follower_head)) {
+                       /* ^^^ Ensure CB invocation follows _head test. */
+                       return;
+               }
+               if (!rcu_nocb_poll)
+                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
+                                           "WokeEmpty");
+               flush_signals(current);
+               schedule_timeout_interruptible(1);
+       }
+}
+
 /*
  * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
- * callbacks queued by the corresponding no-CBs CPU.
+ * callbacks queued by the corresponding no-CBs CPU, however, there is
+ * an optional leader-follower relationship so that the grace-period
+ * kthreads don't have to do quite so many wakeups.
  */
 static int rcu_nocb_kthread(void *arg)
 {
        int c, cl;
-       bool firsttime = 1;
        struct rcu_head *list;
        struct rcu_head *next;
        struct rcu_head **tail;
 
        /* Each pass through this loop invokes one batch of callbacks */
        for (;;) {
-               /* If not polling, wait for next batch of callbacks. */
-               if (!rcu_nocb_poll) {
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                           TPS("Sleep"));
-                       wait_event_interruptible(rdp->nocb_wq, rdp->nocb_head);
-                       /* Memory barrier provide by xchg() below. */
-               } else if (firsttime) {
-                       firsttime = 0;
-                       trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                           TPS("Poll"));
-               }
-               list = ACCESS_ONCE(rdp->nocb_head);
-               if (!list) {
-                       if (!rcu_nocb_poll)
-                               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                                   TPS("WokeEmpty"));
-                       schedule_timeout_interruptible(1);
-                       flush_signals(current);
-                       continue;
-               }
-               firsttime = 1;
-               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
-                                   TPS("WokeNonEmpty"));
-
-               /*
-                * Extract queued callbacks, update counts, and wait
-                * for a grace period to elapse.
-                */
-               ACCESS_ONCE(rdp->nocb_head) = NULL;
-               tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
-               c = atomic_long_xchg(&rdp->nocb_q_count, 0);
-               cl = atomic_long_xchg(&rdp->nocb_q_count_lazy, 0);
-               ACCESS_ONCE(rdp->nocb_p_count) += c;
-               ACCESS_ONCE(rdp->nocb_p_count_lazy) += cl;
-               rcu_nocb_wait_gp(rdp);
+               /* Wait for callbacks. */
+               if (rdp->nocb_leader == rdp)
+                       nocb_leader_wait(rdp);
+               else
+                       nocb_follower_wait(rdp);
+
+               /* Pull the ready-to-invoke callbacks onto local list. */
+               list = ACCESS_ONCE(rdp->nocb_follower_head);
+               BUG_ON(!list);
+               trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
+               ACCESS_ONCE(rdp->nocb_follower_head) = NULL;
+               tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
+               c = atomic_long_xchg(&rdp->nocb_follower_count, 0);
+               cl = atomic_long_xchg(&rdp->nocb_follower_count_lazy, 0);
+               rdp->nocb_p_count += c;
+               rdp->nocb_p_count_lazy += cl;
 
                /* Each pass through the following loop invokes a callback. */
                trace_rcu_batch_start(rdp->rsp->name, cl, c, -1);
        if (!rcu_nocb_need_deferred_wakeup(rdp))
                return;
        ACCESS_ONCE(rdp->nocb_defer_wakeup) = false;
-       wake_up(&rdp->nocb_wq);
+       wake_nocb_leader(rdp, false);
        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("DeferredWakeEmpty"));
 }
 
 {
        rdp->nocb_tail = &rdp->nocb_head;
        init_waitqueue_head(&rdp->nocb_wq);
+       rdp->nocb_follower_tail = &rdp->nocb_follower_head;
 }
 
-/* Create a kthread for each RCU flavor for each no-CBs CPU. */
+/* How many follower CPU IDs per leader?  Default of -1 for sqrt(nr_cpu_ids). */
+static int rcu_nocb_leader_stride = -1;
+module_param(rcu_nocb_leader_stride, int, 0444);
+
+/*
+ * Create a kthread for each RCU flavor for each no-CBs CPU.
+ * Also initialize leader-follower relationships.
+ */
 static void __init rcu_spawn_nocb_kthreads(struct rcu_state *rsp)
 {
        int cpu;
+       int ls = rcu_nocb_leader_stride;
+       int nl = 0;  /* Next leader. */
        struct rcu_data *rdp;
+       struct rcu_data *rdp_leader = NULL;  /* Suppress misguided gcc warn. */
+       struct rcu_data *rdp_prev = NULL;
        struct task_struct *t;
 
        if (rcu_nocb_mask == NULL)
                return;
+       if (ls == -1) {
+               ls = int_sqrt(nr_cpu_ids);
+               rcu_nocb_leader_stride = ls;
+       }
+
+       /*
+        * Each pass through this loop sets up one rcu_data structure and
+        * spawns one rcu_nocb_kthread().
+        */
        for_each_cpu(cpu, rcu_nocb_mask) {
                rdp = per_cpu_ptr(rsp->rda, cpu);
+               if (rdp->cpu >= nl) {
+                       /* New leader, set up for followers & next leader. */
+                       nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
+                       rdp->nocb_leader = rdp;
+                       rdp_leader = rdp;
+               } else {
+                       /* Another follower, link to previous leader. */
+                       rdp->nocb_leader = rdp_leader;
+                       rdp_prev->nocb_next_follower = rdp;
+               }
+               rdp_prev = rdp;
+
+               /* Spawn the kthread for this CPU. */
                t = kthread_run(rcu_nocb_kthread, rdp,
                                "rcuo%c/%d", rsp->abbr, cpu);
                BUG_ON(IS_ERR(t));