BITMAP_IO,              /* suspend application io;
                                   once no more io in flight, start bitmap io */
        BITMAP_IO_QUEUED,       /* Started bitmap IO */
+       GO_DISKLESS,            /* Disk failed, local_cnt reached zero, we are going diskless */
        RESYNC_AFTER_NEG,       /* Resync after online grow after the attach&negotiate finished. */
        NET_CONGESTED,          /* The data socket is congested */
 
        unsigned int ko_count;
        struct drbd_work  resync_work,
                          unplug_work,
+                         go_diskless,
                          md_sync_work;
        struct timer_list resync_timer;
        struct timer_list md_sync_timer;
 extern int drbd_bmio_set_n_write(struct drbd_conf *mdev);
 extern int drbd_bmio_clear_n_write(struct drbd_conf *mdev);
 extern int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why);
+extern void drbd_go_diskless(struct drbd_conf *mdev);
 
 
 /* Meta data layout
        int i = atomic_dec_return(&mdev->local_cnt);
        __release(local);
        D_ASSERT(i >= 0);
-       if (i == 0)
+       if (i == 0) {
+               if (mdev->state.disk == D_FAILED)
+                       drbd_go_diskless(mdev);
                wake_up(&mdev->misc_wait);
+       }
 }
 
 #ifndef __CHECKER__
 
 static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
 static void md_sync_timer_fn(unsigned long data);
 static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
+static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
 
 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
              "Lars Ellenberg <lars@linbit.com>");
            os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
                drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
 
+       /* first half of local IO error */
        if (os.disk > D_FAILED && ns.disk == D_FAILED) {
-               enum drbd_io_error_p eh;
+               enum drbd_io_error_p eh = EP_PASS_ON;
+
+               if (drbd_send_state(mdev))
+                       dev_warn(DEV, "Notified peer that my disk is broken.\n");
+               else
+                       dev_err(DEV, "Sending state for drbd_io_error() failed\n");
+
+               drbd_rs_cancel_all(mdev);
 
-               eh = EP_PASS_ON;
                if (get_ldev_if_state(mdev, D_FAILED)) {
                        eh = mdev->ldev->dc.on_io_error;
                        put_ldev(mdev);
                }
+               if (eh == EP_CALL_HELPER)
+                       drbd_khelper(mdev, "local-io-error");
+       }
 
-               drbd_rs_cancel_all(mdev);
-               /* since get_ldev() only works as long as disk>=D_INCONSISTENT,
-                  and it is D_DISKLESS here, local_cnt can only go down, it can
-                  not increase... It will reach zero */
-               wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+
+       /* second half of local IO error handling,
+        * after local_cnt references have reached zero: */
+       if (os.disk == D_FAILED && ns.disk == D_DISKLESS) {
                mdev->rs_total = 0;
                mdev->rs_failed = 0;
                atomic_set(&mdev->rs_pending_cnt, 0);
-
-               spin_lock_irq(&mdev->req_lock);
-               _drbd_set_state(_NS(mdev, disk, D_DISKLESS), CS_HARD, NULL);
-               spin_unlock_irq(&mdev->req_lock);
-
-               if (eh == EP_CALL_HELPER)
-                       drbd_khelper(mdev, "local-io-error");
        }
 
        if (os.disk > D_DISKLESS && ns.disk == D_DISKLESS) {
+               int c = atomic_read(&mdev->local_cnt);
 
-               if (os.disk == D_FAILED) /* && ns.disk == D_DISKLESS*/ {
-                       if (drbd_send_state(mdev))
-                               dev_warn(DEV, "Notified peer that my disk is broken.\n");
-                       else
-                               dev_err(DEV, "Sending state in drbd_io_error() failed\n");
-               }
+               if (drbd_send_state(mdev))
+                       dev_warn(DEV, "Notified peer that I detached my disk.\n");
+               else
+                       dev_err(DEV, "Sending state for detach failed\n");
 
-               wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+               if (c != 0) {
+                       dev_err(DEV, "Logic bug, local_cnt=%d, but should be 0\n", c);
+                       wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
+               }
                lc_destroy(mdev->resync);
                mdev->resync = NULL;
                lc_destroy(mdev->act_log);
        INIT_LIST_HEAD(&mdev->meta.work.q);
        INIT_LIST_HEAD(&mdev->resync_work.list);
        INIT_LIST_HEAD(&mdev->unplug_work.list);
+       INIT_LIST_HEAD(&mdev->go_diskless.list);
        INIT_LIST_HEAD(&mdev->md_sync_work.list);
        INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
 
        mdev->resync_work.cb  = w_resync_inactive;
        mdev->unplug_work.cb  = w_send_write_hint;
+       mdev->go_diskless.cb  = w_go_diskless;
        mdev->md_sync_work.cb = w_md_sync;
        mdev->bm_io_work.w.cb = w_bitmap_io;
        init_timer(&mdev->resync_timer);
        D_ASSERT(list_empty(&mdev->meta.work.q));
        D_ASSERT(list_empty(&mdev->resync_work.list));
        D_ASSERT(list_empty(&mdev->unplug_work.list));
+       D_ASSERT(list_empty(&mdev->go_diskless.list));
 
 }
 
        return 1;
 }
 
+static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
+{
+       D_ASSERT(mdev->state.disk == D_FAILED);
+       D_ASSERT(atomic_read(&mdev->local_cnt) == 0);
+
+       drbd_force_state(mdev, NS(disk, D_DISKLESS));
+
+       clear_bit(GO_DISKLESS, &mdev->flags);
+       return 1;
+}
+
+void drbd_go_diskless(struct drbd_conf *mdev)
+{
+       D_ASSERT(mdev->state.disk == D_FAILED);
+       if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
+               drbd_queue_work_front(&mdev->data.work, &mdev->go_diskless);
+}
+
 /**
  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
  * @mdev:      DRBD device.