static RPC_WAITQ(delay_queue, "delayq");
 
 /*
- * All RPC tasks are linked into this list
+ * All RPC clients are linked into this list
  */
-static LIST_HEAD(all_tasks);
+static LIST_HEAD(all_clients);
+static DECLARE_WAIT_QUEUE_HEAD(client_kill_wait);
 
 /*
  * rpciod-related stuff
        task->tk_pid = rpc_task_id++;
 #endif
        /* Add to global list of all tasks */
-       list_add_tail(&task->tk_task, &all_tasks);
+       if (task->tk_client)
+               list_add_tail(&task->tk_task, &task->tk_client->cl_tasks);
        spin_unlock(&rpc_sched_lock);
 }
 
        if (tk_ops->rpc_call_prepare != NULL)
                task->tk_action = rpc_prepare_task;
        task->tk_calldata = calldata;
+       INIT_LIST_HEAD(&task->tk_task);
 
        /* Initialize retry counters */
        task->tk_garb_retry = 2;
 #endif
        dprintk("RPC: %5u release task\n", task->tk_pid);
 
-       /* Remove from global task list */
-       spin_lock(&rpc_sched_lock);
-       list_del(&task->tk_task);
-       spin_unlock(&rpc_sched_lock);
-
+       if (!list_empty(&task->tk_task)) {
+               /* Remove from client task list */
+               spin_lock(&rpc_sched_lock);
+               list_del(&task->tk_task);
+               spin_unlock(&rpc_sched_lock);
+       }
        BUG_ON (RPC_IS_QUEUED(task));
 
        /* Synchronously delete any running timer */
  * Kill all tasks for the given client.
  * XXX: kill their descendants as well?
  */
-void rpc_killall_tasks(struct rpc_clnt *clnt)
+static void rpc_killall_tasks_locked(struct list_head *head)
 {
        struct rpc_task *rovr;
-       struct list_head *le;
 
-       dprintk("RPC:       killing all tasks for client %p\n", clnt);
 
-       /*
-        * Spin lock all_tasks to prevent changes...
-        */
-       spin_lock(&rpc_sched_lock);
-       alltask_for_each(rovr, le, &all_tasks) {
+       list_for_each_entry(rovr, head, tk_task) {
                if (! RPC_IS_ACTIVATED(rovr))
                        continue;
-               if (!clnt || rovr->tk_client == clnt) {
+               if (!(rovr->tk_flags & RPC_TASK_KILLED)) {
                        rovr->tk_flags |= RPC_TASK_KILLED;
                        rpc_exit(rovr, -EIO);
                        rpc_wake_up_task(rovr);
                }
        }
+}
+
+void rpc_killall_tasks(struct rpc_clnt *clnt)
+{
+       dprintk("RPC:       killing all tasks for client %p\n", clnt);
+       /*
+        * Spin lock all_tasks to prevent changes...
+        */
+       spin_lock(&rpc_sched_lock);
+       rpc_killall_tasks_locked(&clnt->cl_tasks);
        spin_unlock(&rpc_sched_lock);
 }
 
 static void rpciod_killall(void)
 {
+       struct rpc_clnt *clnt;
        unsigned long flags;
 
-       while (!list_empty(&all_tasks)) {
+       for(;;) {
                clear_thread_flag(TIF_SIGPENDING);
-               rpc_killall_tasks(NULL);
+
+               spin_lock(&rpc_sched_lock);
+               list_for_each_entry(clnt, &all_clients, cl_clients)
+                       rpc_killall_tasks_locked(&clnt->cl_tasks);
+               spin_unlock(&rpc_sched_lock);
                flush_workqueue(rpciod_workqueue);
-               if (!list_empty(&all_tasks)) {
-                       dprintk("RPC:       rpciod_killall: waiting for tasks "
+               if (!list_empty(&all_clients))
+                       break;
+               dprintk("RPC:       rpciod_killall: waiting for tasks "
                                        "to exit\n");
-                       yield();
-               }
+               wait_event_timeout(client_kill_wait,
+                               list_empty(&all_clients), 1*HZ);
        }
 
        spin_lock_irqsave(¤t->sighand->siglock, flags);
        spin_unlock_irqrestore(¤t->sighand->siglock, flags);
 }
 
+void rpc_register_client(struct rpc_clnt *clnt)
+{
+       spin_lock(&rpc_sched_lock);
+       list_add(&clnt->cl_clients, &all_clients);
+       spin_unlock(&rpc_sched_lock);
+}
+
+void rpc_unregister_client(struct rpc_clnt *clnt)
+{
+       spin_lock(&rpc_sched_lock);
+       list_del(&clnt->cl_clients);
+       if (list_empty(&all_clients))
+               wake_up(&client_kill_wait);
+       spin_unlock(&rpc_sched_lock);
+}
+
 /*
  * Start up the rpciod process if it's not already running.
  */
 #ifdef RPC_DEBUG
 void rpc_show_tasks(void)
 {
-       struct list_head *le;
+       struct rpc_clnt *clnt;
        struct rpc_task *t;
 
        spin_lock(&rpc_sched_lock);
-       if (list_empty(&all_tasks)) {
-               spin_unlock(&rpc_sched_lock);
-               return;
-       }
+       if (list_empty(&all_clients))
+               goto out;
        printk("-pid- proc flgs status -client- -prog- --rqstp- -timeout "
                "-rpcwait -action- ---ops--\n");
-       alltask_for_each(t, le, &all_tasks) {
-               const char *rpc_waitq = "none";
-
-               if (RPC_IS_QUEUED(t))
-                       rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
-
-               printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
-                       t->tk_pid,
-                       (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
-                       t->tk_flags, t->tk_status,
-                       t->tk_client,
-                       (t->tk_client ? t->tk_client->cl_prog : 0),
-                       t->tk_rqstp, t->tk_timeout,
-                       rpc_waitq,
-                       t->tk_action, t->tk_ops);
+       list_for_each_entry(clnt, &all_clients, cl_clients) {
+               list_for_each_entry(t, &clnt->cl_tasks, tk_task) {
+                       const char *rpc_waitq = "none";
+
+                       if (RPC_IS_QUEUED(t))
+                               rpc_waitq = rpc_qname(t->u.tk_wait.rpc_waitq);
+
+                       printk("%5u %04d %04x %6d %8p %6d %8p %8ld %8s %8p %8p\n",
+                               t->tk_pid,
+                               (t->tk_msg.rpc_proc ? t->tk_msg.rpc_proc->p_proc : -1),
+                               t->tk_flags, t->tk_status,
+                               t->tk_client,
+                               (t->tk_client ? t->tk_client->cl_prog : 0),
+                               t->tk_rqstp, t->tk_timeout,
+                               rpc_waitq,
+                               t->tk_action, t->tk_ops);
+               }
        }
+out:
        spin_unlock(&rpc_sched_lock);
 }
 #endif