MODULE_PARM_DESC(aoe_maxout,
        "Only aoe_maxout outstanding packets for every MAC on eX.Y.");
 
-static wait_queue_head_t ktiowq;
-static struct ktstate kts;
+/* The number of online cpus during module initialization gives us a
+ * convenient heuristic cap on the parallelism used for ktio threads
+ * doing I/O completion.  It is not important that the cap equal the
+ * actual number of running CPUs at any given time, but because of CPU
+ * hotplug, we take care to use ncpus instead of using
+ * num_online_cpus() after module initialization.
+ */
+static int ncpus;
+
+/* mutex lock used for synchronization while thread spawning */
+static DEFINE_MUTEX(ktio_spawn_lock);
+
+static wait_queue_head_t *ktiowq;
+static struct ktstate *kts;
 
 /* io completion queue */
-static struct {
+struct iocq_ktio {
        struct list_head head;
        spinlock_t lock;
-} iocq;
+};
+static struct iocq_ktio *iocq;
 
 static struct page *empty_page;
 
  * Returns true iff responses needing processing remain.
  */
 static int
-ktio(void)
+ktio(int id)
 {
        struct frame *f;
        struct list_head *pos;
        int i;
+       int actual_id;
 
        for (i = 0; ; ++i) {
                if (i == MAXIOC)
                        return 1;
-               if (list_empty(&iocq.head))
+               if (list_empty(&iocq[id].head))
                        return 0;
-               pos = iocq.head.next;
+               pos = iocq[id].head.next;
                list_del(pos);
-               spin_unlock_irq(&iocq.lock);
                f = list_entry(pos, struct frame, head);
+               spin_unlock_irq(&iocq[id].lock);
                ktiocomplete(f);
-               spin_lock_irq(&iocq.lock);
+
+               /* Figure out if extra threads are required. */
+               actual_id = f->t->d->aoeminor % ncpus;
+
+               if (!kts[actual_id].active) {
+                       BUG_ON(id != 0);
+                       mutex_lock(&ktio_spawn_lock);
+                       if (!kts[actual_id].active
+                               && aoe_ktstart(&kts[actual_id]) == 0)
+                               kts[actual_id].active = 1;
+                       mutex_unlock(&ktio_spawn_lock);
+               }
+               spin_lock_irq(&iocq[id].lock);
        }
 }
 
        complete(&k->rendez);   /* tell spawner we're running */
        do {
                spin_lock_irq(k->lock);
-               more = k->fn();
+               more = k->fn(k->id);
                if (!more) {
                        add_wait_queue(k->waitq, &wait);
                        __set_current_state(TASK_INTERRUPTIBLE);
 static void
 ktcomplete(struct frame *f, struct sk_buff *skb)
 {
+       int id;
        ulong flags;
 
        f->r_skb = skb;
-       spin_lock_irqsave(&iocq.lock, flags);
-       list_add_tail(&f->head, &iocq.head);
-       spin_unlock_irqrestore(&iocq.lock, flags);
-       wake_up(&ktiowq);
+       id = f->t->d->aoeminor % ncpus;
+       spin_lock_irqsave(&iocq[id].lock, flags);
+       if (!kts[id].active) {
+               spin_unlock_irqrestore(&iocq[id].lock, flags);
+               /* The thread with id has not been spawned yet,
+                * so delegate the work to the main thread and
+                * try spawning a new thread.
+                */
+               id = 0;
+               spin_lock_irqsave(&iocq[id].lock, flags);
+       }
+       list_add_tail(&f->head, &iocq[id].head);
+       spin_unlock_irqrestore(&iocq[id].lock, flags);
+       wake_up(&ktiowq[id]);
 }
 
 struct sk_buff *
 
 void
 aoe_flush_iocq(void)
+{
+       int i;
+
+       for (i = 0; i < ncpus; i++) {
+               if (kts[i].active)
+                       aoe_flush_iocq_by_index(i);
+       }
+}
+
+void
+aoe_flush_iocq_by_index(int id)
 {
        struct frame *f;
        struct aoedev *d;
        struct sk_buff *skb;
        ulong flags;
 
-       spin_lock_irqsave(&iocq.lock, flags);
-       list_splice_init(&iocq.head, &flist);
-       spin_unlock_irqrestore(&iocq.lock, flags);
+       spin_lock_irqsave(&iocq[id].lock, flags);
+       list_splice_init(&iocq[id].head, &flist);
+       spin_unlock_irqrestore(&iocq[id].lock, flags);
        while (!list_empty(&flist)) {
                pos = flist.next;
                list_del(pos);
 aoecmd_init(void)
 {
        void *p;
+       int i;
+       int ret;
 
        /* get_zeroed_page returns page with ref count 1 */
        p = (void *) get_zeroed_page(GFP_KERNEL | __GFP_REPEAT);
                return -ENOMEM;
        empty_page = virt_to_page(p);
 
-       INIT_LIST_HEAD(&iocq.head);
-       spin_lock_init(&iocq.lock);
-       init_waitqueue_head(&ktiowq);
-       kts.name = "aoe_ktio";
-       kts.fn = ktio;
-       kts.waitq = &ktiowq;
-       kts.lock = &iocq.lock;
-       return aoe_ktstart(&kts);
+       ncpus = num_online_cpus();
+
+       iocq = kcalloc(ncpus, sizeof(struct iocq_ktio), GFP_KERNEL);
+       if (!iocq)
+               return -ENOMEM;
+
+       kts = kcalloc(ncpus, sizeof(struct ktstate), GFP_KERNEL);
+       if (!kts) {
+               ret = -ENOMEM;
+               goto kts_fail;
+       }
+
+       ktiowq = kcalloc(ncpus, sizeof(wait_queue_head_t), GFP_KERNEL);
+       if (!ktiowq) {
+               ret = -ENOMEM;
+               goto ktiowq_fail;
+       }
+
+       mutex_init(&ktio_spawn_lock);
+
+       for (i = 0; i < ncpus; i++) {
+               INIT_LIST_HEAD(&iocq[i].head);
+               spin_lock_init(&iocq[i].lock);
+               init_waitqueue_head(&ktiowq[i]);
+               snprintf(kts[i].name, sizeof(kts[i].name), "aoe_ktio%d", i);
+               kts[i].fn = ktio;
+               kts[i].waitq = &ktiowq[i];
+               kts[i].lock = &iocq[i].lock;
+               kts[i].id = i;
+               kts[i].active = 0;
+       }
+       kts[0].active = 1;
+       if (aoe_ktstart(&kts[0])) {
+               ret = -ENOMEM;
+               goto ktstart_fail;
+       }
+       return 0;
+
+ktstart_fail:
+       kfree(ktiowq);
+ktiowq_fail:
+       kfree(kts);
+kts_fail:
+       kfree(iocq);
+
+       return ret;
 }
 
 void
 aoecmd_exit(void)
 {
-       aoe_ktstop(&kts);
+       int i;
+
+       for (i = 0; i < ncpus; i++)
+               if (kts[i].active)
+                       aoe_ktstop(&kts[i]);
+
        aoe_flush_iocq();
 
+       /* Free up the iocq and thread speicific configuration
+       * allocated during startup.
+       */
+       kfree(iocq);
+       kfree(kts);
+       kfree(ktiowq);
+
        free_page((unsigned long) page_address(empty_page));
        empty_page = NULL;
 }