folio_batch_init(&rqstp->rq_fbatch);
 
-       init_llist_node(&rqstp->rq_idle);
        rqstp->rq_server = serv;
        rqstp->rq_pool = pool;
 
        struct llist_node *ln;
 
        rcu_read_lock();
-       spin_lock_bh(&pool->sp_lock);
-       ln = llist_del_first_init(&pool->sp_idle_threads);
-       spin_unlock_bh(&pool->sp_lock);
+       ln = READ_ONCE(pool->sp_idle_threads.first);
        if (ln) {
                rqstp = llist_entry(ln, struct svc_rqst, rq_idle);
-
                WRITE_ONCE(rqstp->rq_qtime, ktime_get());
-               wake_up_process(rqstp->rq_task);
+               if (!task_is_running(rqstp->rq_task)) {
+                       wake_up_process(rqstp->rq_task);
+                       trace_svc_wake_up(rqstp->rq_task->pid);
+                       percpu_counter_inc(&pool->sp_threads_woken);
+               }
                rcu_read_unlock();
-               percpu_counter_inc(&pool->sp_threads_woken);
-               trace_svc_wake_up(rqstp->rq_task->pid);
                return;
        }
        rcu_read_unlock();
 
        if (svc_thread_should_sleep(rqstp)) {
                set_current_state(TASK_IDLE | TASK_FREEZABLE);
                llist_add(&rqstp->rq_idle, &pool->sp_idle_threads);
+               if (likely(svc_thread_should_sleep(rqstp)))
+                       schedule();
 
-               if (unlikely(!svc_thread_should_sleep(rqstp)))
-                       /* Work just became available.  This thread cannot simply
-                        * choose not to sleep as it *must* wait until removed.
-                        * So wake the first waiter - whether it is this
-                        * thread or some other, it will get the work done.
+               while (!llist_del_first_this(&pool->sp_idle_threads,
+                                            &rqstp->rq_idle)) {
+                       /* Work just became available.  This thread can only
+                        * handle it after removing rqstp from the idle
+                        * list. If that attempt failed, some other thread
+                        * must have queued itself after finding no
+                        * work to do, so that thread has taken responsibly
+                        * for this new work.  This thread can safely sleep
+                        * until woken again.
                         */
-                       svc_pool_wake_idle_thread(pool);
-
-               /* Since a thread cannot remove itself from an llist,
-                * schedule until someone else removes @rqstp from
-                * the idle list.
-                */
-               while (!svc_thread_busy(rqstp)) {
                        schedule();
                        set_current_state(TASK_IDLE | TASK_FREEZABLE);
                }
        svc_xprt_release(rqstp);
 }
 
+static void svc_thread_wake_next(struct svc_rqst *rqstp)
+{
+       if (!svc_thread_should_sleep(rqstp))
+               /* More work pending after I dequeued some,
+                * wake another worker
+                */
+               svc_pool_wake_idle_thread(rqstp->rq_pool);
+}
+
 /**
  * svc_recv - Receive and process the next request on any transport
  * @rqstp: an idle RPC service thread
 
        clear_bit(SP_TASK_PENDING, &pool->sp_flags);
 
-       if (svc_thread_should_stop(rqstp))
+       if (svc_thread_should_stop(rqstp)) {
+               svc_thread_wake_next(rqstp);
                return;
+       }
 
        rqstp->rq_xprt = svc_xprt_dequeue(pool);
        if (rqstp->rq_xprt) {
                struct svc_xprt *xprt = rqstp->rq_xprt;
 
+               svc_thread_wake_next(rqstp);
                /* Normally we will wait up to 5 seconds for any required
                 * cache information to be provided.  When there are no
                 * idle threads, we reduce the wait time.
                if (req) {
                        list_del(&req->rq_bc_list);
                        spin_unlock_bh(&serv->sv_cb_lock);
+                       svc_thread_wake_next(rqstp);
 
                        svc_process_bc(req, rqstp);
                        return;