#define KFREE_DRAIN_JIFFIES (HZ / 50)
 #define KFREE_N_BATCHES 2
 
+/*
+ * This macro defines how many entries the "records" array
+ * will contain. It is based on the fact that the size of
+ * kfree_rcu_bulk_data structure becomes exactly one page.
+ */
+#define KFREE_BULK_MAX_ENTR ((PAGE_SIZE / sizeof(void *)) - 3)
+
+/**
+ * struct kfree_rcu_bulk_data - single block to store kfree_rcu() pointers
+ * @nr_records: Number of active pointers in the array
+ * @records: Array of the kfree_rcu() pointers
+ * @next: Next bulk object in the block chain
+ * @head_free_debug: For debug, when CONFIG_DEBUG_OBJECTS_RCU_HEAD is set
+ */
+struct kfree_rcu_bulk_data {
+       unsigned long nr_records;
+       void *records[KFREE_BULK_MAX_ENTR];
+       struct kfree_rcu_bulk_data *next;
+       struct rcu_head *head_free_debug;
+};
+
 /**
  * struct kfree_rcu_cpu_work - single batch of kfree_rcu() requests
  * @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
  * @head_free: List of kfree_rcu() objects waiting for a grace period
+ * @bhead_free: Bulk-List of kfree_rcu() objects waiting for a grace period
  * @krcp: Pointer to @kfree_rcu_cpu structure
  */
 
 struct kfree_rcu_cpu_work {
        struct rcu_work rcu_work;
        struct rcu_head *head_free;
+       struct kfree_rcu_bulk_data *bhead_free;
        struct kfree_rcu_cpu *krcp;
 };
 
 /**
  * struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
  * @head: List of kfree_rcu() objects not yet waiting for a grace period
+ * @bhead: Bulk-List of kfree_rcu() objects not yet waiting for a grace period
+ * @bcached: Keeps at most one object for later reuse when build chain blocks
  * @krw_arr: Array of batches of kfree_rcu() objects waiting for a grace period
  * @lock: Synchronize access to this structure
  * @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
  */
 struct kfree_rcu_cpu {
        struct rcu_head *head;
+       struct kfree_rcu_bulk_data *bhead;
+       struct kfree_rcu_bulk_data *bcached;
        struct kfree_rcu_cpu_work krw_arr[KFREE_N_BATCHES];
        spinlock_t lock;
        struct delayed_work monitor_work;
 
 static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
 
+static __always_inline void
+debug_rcu_head_unqueue_bulk(struct rcu_head *head)
+{
+#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
+       for (; head; head = head->next)
+               debug_rcu_head_unqueue(head);
+#endif
+}
+
 /*
  * This function is invoked in workqueue context after a grace period.
- * It frees all the objects queued on ->head_free.
+ * It frees all the objects queued on ->bhead_free or ->head_free.
  */
 static void kfree_rcu_work(struct work_struct *work)
 {
        unsigned long flags;
        struct rcu_head *head, *next;
+       struct kfree_rcu_bulk_data *bhead, *bnext;
        struct kfree_rcu_cpu *krcp;
        struct kfree_rcu_cpu_work *krwp;
 
        spin_lock_irqsave(&krcp->lock, flags);
        head = krwp->head_free;
        krwp->head_free = NULL;
+       bhead = krwp->bhead_free;
+       krwp->bhead_free = NULL;
        spin_unlock_irqrestore(&krcp->lock, flags);
 
-       // List "head" is now private, so traverse locklessly.
+       /* "bhead" is now private, so traverse locklessly. */
+       for (; bhead; bhead = bnext) {
+               bnext = bhead->next;
+
+               debug_rcu_head_unqueue_bulk(bhead->head_free_debug);
+
+               rcu_lock_acquire(&rcu_callback_map);
+               kfree_bulk(bhead->nr_records, bhead->records);
+               rcu_lock_release(&rcu_callback_map);
+
+               if (cmpxchg(&krcp->bcached, NULL, bhead))
+                       free_page((unsigned long) bhead);
+
+               cond_resched_tasks_rcu_qs();
+       }
+
+       /*
+        * Emergency case only. It can happen under low memory
+        * condition when an allocation gets failed, so the "bulk"
+        * path can not be temporary maintained.
+        */
        for (; head; head = next) {
                unsigned long offset = (unsigned long)head->func;
 
                next = head->next;
-               // Potentially optimize with kfree_bulk in future.
                debug_rcu_head_unqueue(head);
                rcu_lock_acquire(&rcu_callback_map);
                trace_rcu_invoke_kfree_callback(rcu_state.name, head, offset);
 
-               if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset))) {
-                       /* Could be optimized with kfree_bulk() in future. */
+               if (!WARN_ON_ONCE(!__is_kfree_rcu_offset(offset)))
                        kfree((void *)head - offset);
-               }
 
                rcu_lock_release(&rcu_callback_map);
                cond_resched_tasks_rcu_qs();
  */
 static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
 {
+       struct kfree_rcu_cpu_work *krwp;
+       bool queued = false;
        int i;
-       struct kfree_rcu_cpu_work *krwp = NULL;
 
        lockdep_assert_held(&krcp->lock);
-       for (i = 0; i < KFREE_N_BATCHES; i++)
-               if (!krcp->krw_arr[i].head_free) {
-                       krwp = &(krcp->krw_arr[i]);
-                       break;
-               }
 
-       // If a previous RCU batch is in progress, we cannot immediately
-       // queue another one, so return false to tell caller to retry.
-       if (!krwp)
-               return false;
+       for (i = 0; i < KFREE_N_BATCHES; i++) {
+               krwp = &(krcp->krw_arr[i]);
 
-       krwp->head_free = krcp->head;
-       krcp->head = NULL;
-       INIT_RCU_WORK(&krwp->rcu_work, kfree_rcu_work);
-       queue_rcu_work(system_wq, &krwp->rcu_work);
-       return true;
+               /*
+                * Try to detach bhead or head and attach it over any
+                * available corresponding free channel. It can be that
+                * a previous RCU batch is in progress, it means that
+                * immediately to queue another one is not possible so
+                * return false to tell caller to retry.
+                */
+               if ((krcp->bhead && !krwp->bhead_free) ||
+                               (krcp->head && !krwp->head_free)) {
+                       /* Channel 1. */
+                       if (!krwp->bhead_free) {
+                               krwp->bhead_free = krcp->bhead;
+                               krcp->bhead = NULL;
+                       }
+
+                       /* Channel 2. */
+                       if (!krwp->head_free) {
+                               krwp->head_free = krcp->head;
+                               krcp->head = NULL;
+                       }
+
+                       /*
+                        * One work is per one batch, so there are two "free channels",
+                        * "bhead_free" and "head_free" the batch can handle. It can be
+                        * that the work is in the pending state when two channels have
+                        * been detached following each other, one by one.
+                        */
+                       queue_rcu_work(system_wq, &krwp->rcu_work);
+                       queued = true;
+               }
+       }
+
+       return queued;
 }
 
 static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
                spin_unlock_irqrestore(&krcp->lock, flags);
 }
 
+static inline bool
+kfree_call_rcu_add_ptr_to_bulk(struct kfree_rcu_cpu *krcp,
+       struct rcu_head *head, rcu_callback_t func)
+{
+       struct kfree_rcu_bulk_data *bnode;
+
+       if (unlikely(!krcp->initialized))
+               return false;
+
+       lockdep_assert_held(&krcp->lock);
+
+       /* Check if a new block is required. */
+       if (!krcp->bhead ||
+                       krcp->bhead->nr_records == KFREE_BULK_MAX_ENTR) {
+               bnode = xchg(&krcp->bcached, NULL);
+               if (!bnode) {
+                       WARN_ON_ONCE(sizeof(struct kfree_rcu_bulk_data) > PAGE_SIZE);
+
+                       bnode = (struct kfree_rcu_bulk_data *)
+                               __get_free_page(GFP_NOWAIT | __GFP_NOWARN);
+               }
+
+               /* Switch to emergency path. */
+               if (unlikely(!bnode))
+                       return false;
+
+               /* Initialize the new block. */
+               bnode->nr_records = 0;
+               bnode->next = krcp->bhead;
+               bnode->head_free_debug = NULL;
+
+               /* Attach it to the head. */
+               krcp->bhead = bnode;
+       }
+
+#ifdef CONFIG_DEBUG_OBJECTS_RCU_HEAD
+       head->func = func;
+       head->next = krcp->bhead->head_free_debug;
+       krcp->bhead->head_free_debug = head;
+#endif
+
+       /* Finally insert. */
+       krcp->bhead->records[krcp->bhead->nr_records++] =
+               (void *) head - (unsigned long) func;
+
+       return true;
+}
+
 /*
- * Queue a request for lazy invocation of kfree() after a grace period.
+ * Queue a request for lazy invocation of kfree_bulk()/kfree() after a grace
+ * period. Please note there are two paths are maintained, one is the main one
+ * that uses kfree_bulk() interface and second one is emergency one, that is
+ * used only when the main path can not be maintained temporary, due to memory
+ * pressure.
  *
  * Each kfree_call_rcu() request is added to a batch. The batch will be drained
- * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch
- * will be kfree'd in workqueue context. This allows us to:
- *
- * 1.  Batch requests together to reduce the number of grace periods during
- *     heavy kfree_rcu() load.
- *
- * 2.  It makes it possible to use kfree_bulk() on a large number of
- *     kfree_rcu() requests thus reducing cache misses and the per-object
- *     overhead of kfree().
+ * every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch will
+ * be free'd in workqueue context. This allows us to: batch requests together to
+ * reduce the number of grace periods during heavy kfree_rcu() load.
  */
 void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
 {
                          __func__, head);
                goto unlock_return;
        }
-       head->func = func;
-       head->next = krcp->head;
-       krcp->head = head;
+
+       /*
+        * Under high memory pressure GFP_NOWAIT can fail,
+        * in that case the emergency path is maintained.
+        */
+       if (unlikely(!kfree_call_rcu_add_ptr_to_bulk(krcp, head, func))) {
+               head->func = func;
+               head->next = krcp->head;
+               krcp->head = head;
+       }
 
        // Set timer to drain after KFREE_DRAIN_JIFFIES.
        if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
                struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
 
                spin_lock_init(&krcp->lock);
-               for (i = 0; i < KFREE_N_BATCHES; i++)
+               for (i = 0; i < KFREE_N_BATCHES; i++) {
+                       INIT_RCU_WORK(&krcp->krw_arr[i].rcu_work, kfree_rcu_work);
                        krcp->krw_arr[i].krcp = krcp;
+               }
+
                INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
                krcp->initialized = true;
        }