gfs2_glock_remove_from_lru(gl);
        spin_unlock(&gl->gl_lockref.lock);
        GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
-       GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
+       GLOCK_BUG_ON(gl, mapping && mapping->nrpages && !gfs2_withdrawn(sdp));
        trace_gfs2_glock_put(gl);
        sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
 }
        unsigned int lck_flags = (unsigned int)(gh ? gh->gh_flags : 0);
        int ret;
 
-       if (target != LM_ST_UNLOCKED && glock_blocked_by_withdraw(gl))
+       if (target != LM_ST_UNLOCKED && glock_blocked_by_withdraw(gl) &&
+           gh && !(gh->gh_flags & LM_FLAG_NOEXP))
                return;
        lck_flags &= (LM_FLAG_TRY | LM_FLAG_TRY_1CB | LM_FLAG_NOEXP |
                      LM_FLAG_PRIORITY);
        struct gfs2_glock *gl = gh->gh_gl;
        int error = 0;
 
-       if (glock_blocked_by_withdraw(gl))
+       if (glock_blocked_by_withdraw(gl) && !(gh->gh_flags & LM_FLAG_NOEXP))
                return -EIO;
 
        if (test_bit(GLF_LRU, &gl->gl_flags))
 void gfs2_glock_dq(struct gfs2_holder *gh)
 {
        struct gfs2_glock *gl = gh->gh_gl;
+       struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
        unsigned delay = 0;
        int fast_path = 0;
 
        spin_lock(&gl->gl_lockref.lock);
+       /*
+        * If we're in the process of file system withdraw, we cannot just
+        * dequeue any glocks until our journal is recovered, lest we
+        * introduce file system corruption. We need two exceptions to this
+        * rule: We need to allow unlocking of nondisk glocks and the glock
+        * for our own journal that needs recovery.
+        */
+       if (test_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags) &&
+           glock_blocked_by_withdraw(gl) &&
+           gh->gh_gl != sdp->sd_jinode_gl) {
+               sdp->sd_glock_dqs_held++;
+               might_sleep();
+               wait_on_bit(&sdp->sd_flags, SDF_WITHDRAW_RECOVERY,
+                           TASK_UNINTERRUPTIBLE);
+       }
        if (gh->gh_flags & GL_NOCACHE)
                handle_callback(gl, LM_ST_UNLOCKED, 0, false);
 
 
 
 struct workqueue_struct *gfs2_freeze_wq;
 
+extern struct workqueue_struct *gfs2_control_wq;
+
 static void gfs2_ail_error(struct gfs2_glock *gl, const struct buffer_head *bh)
 {
        fs_err(gl->gl_name.ln_sbd,
        int error = 0;
        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 
-       if (gl->gl_state == LM_ST_SHARED &&
+       if (gl->gl_state == LM_ST_SHARED && !gfs2_withdrawn(sdp) &&
            test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags)) {
                atomic_set(&sdp->sd_freeze_state, SFS_STARTING_FREEZE);
                error = freeze_super(sdp->sd_vfs);
                if (error) {
                        fs_info(sdp, "GFS2: couldn't freeze filesystem: %d\n",
                                error);
+                       if (gfs2_withdrawn(sdp)) {
+                               atomic_set(&sdp->sd_freeze_state, SFS_UNFROZEN);
+                               return;
+                       }
                        gfs2_assert_withdraw(sdp, 0);
                }
                queue_work(gfs2_freeze_wq, &sdp->sd_freeze_work);
        }
 }
 
+/**
+ * inode_go_free - wake up anyone waiting for dlm's unlock ast to free it
+ * @gl: glock being freed
+ *
+ * For now, this is only used for the journal inode glock. In withdraw
+ * situations, we need to wait for the glock to be freed so that we know
+ * other nodes may proceed with recovery / journal replay.
+ */
+static void inode_go_free(struct gfs2_glock *gl)
+{
+       /* Note that we cannot reference gl_object because it's already set
+        * to NULL by this point in its lifecycle. */
+       if (!test_bit(GLF_FREEING, &gl->gl_flags))
+               return;
+       clear_bit_unlock(GLF_FREEING, &gl->gl_flags);
+       wake_up_bit(&gl->gl_flags, GLF_FREEING);
+}
+
+/**
+ * nondisk_go_callback - used to signal when a node did a withdraw
+ * @gl: the nondisk glock
+ * @remote: true if this came from a different cluster node
+ *
+ */
+static void nondisk_go_callback(struct gfs2_glock *gl, bool remote)
+{
+       struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
+
+       /* Ignore the callback unless it's from another node, and it's the
+          live lock. */
+       if (!remote || gl->gl_name.ln_number != GFS2_LIVE_LOCK)
+               return;
+
+       /* First order of business is to cancel the demote request. We don't
+        * really want to demote a nondisk glock. At best it's just to inform
+        * us of another node's withdraw. We'll keep it in SH mode. */
+       clear_bit(GLF_DEMOTE, &gl->gl_flags);
+       clear_bit(GLF_PENDING_DEMOTE, &gl->gl_flags);
+
+       /* Ignore the unlock if we're withdrawn, unmounting, or in recovery. */
+       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags) ||
+           test_bit(SDF_WITHDRAWN, &sdp->sd_flags) ||
+           test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags))
+               return;
+
+       /* We only care when a node wants us to unlock, because that means
+        * they want a journal recovered. */
+       if (gl->gl_demote_state != LM_ST_UNLOCKED)
+               return;
+
+       if (sdp->sd_args.ar_spectator) {
+               fs_warn(sdp, "Spectator node cannot recover journals.\n");
+               return;
+       }
+
+       fs_warn(sdp, "Some node has withdrawn; checking for recovery.\n");
+       set_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
+       /*
+        * We can't call remote_withdraw directly here or gfs2_recover_journal
+        * because this is called from the glock unlock function and the
+        * remote_withdraw needs to enqueue and dequeue the same "live" glock
+        * we were called from. So we queue it to the control work queue in
+        * lock_dlm.
+        */
+       queue_delayed_work(gfs2_control_wq, &sdp->sd_control_work, 0);
+}
+
 const struct gfs2_glock_operations gfs2_meta_glops = {
        .go_type = LM_TYPE_META,
        .go_flags = GLOF_NONDISK,
        .go_dump = inode_go_dump,
        .go_type = LM_TYPE_INODE,
        .go_flags = GLOF_ASPACE | GLOF_LRU,
+       .go_free = inode_go_free,
 };
 
 const struct gfs2_glock_operations gfs2_rgrp_glops = {
 const struct gfs2_glock_operations gfs2_nondisk_glops = {
        .go_type = LM_TYPE_NONDISK,
        .go_flags = GLOF_NONDISK,
+       .go_callback = nondisk_go_callback,
 };
 
 const struct gfs2_glock_operations gfs2_quota_glops = {
 
        void (*go_dump)(struct seq_file *seq, struct gfs2_glock *gl,
                        const char *fs_id_buf);
        void (*go_callback)(struct gfs2_glock *gl, bool remote);
+       void (*go_free)(struct gfs2_glock *gl);
        const int go_type;
        const unsigned long go_flags;
 #define GLOF_ASPACE 1 /* address space attached */
        GLF_OBJECT                      = 14, /* Used only for tracing */
        GLF_BLOCKING                    = 15,
        GLF_INODE_CREATING              = 16, /* Inode creation occurring */
+       GLF_FREEING                     = 18, /* Wait for glock to be freed */
 };
 
 struct gfs2_glock {
        SDF_FORCE_AIL_FLUSH     = 9,
        SDF_FS_FROZEN           = 10,
        SDF_WITHDRAWING         = 11, /* Will withdraw eventually */
+       SDF_WITHDRAW_IN_PROG    = 12, /* Withdraw is in progress */
+       SDF_REMOTE_WITHDRAW     = 13, /* Performing remote recovery */
+       SDF_WITHDRAW_RECOVERY   = 14, /* Wait for journal recovery when we are
+                                        withdrawing */
 };
 
 enum gfs2_freeze_state {
        struct gfs2_jdesc *sd_jdesc;
        struct gfs2_holder sd_journal_gh;
        struct gfs2_holder sd_jinode_gh;
+       struct gfs2_glock *sd_jinode_gl;
 
        struct gfs2_holder sd_sc_gh;
        struct gfs2_holder sd_qc_gh;
        struct bio *sd_log_bio;
        wait_queue_head_t sd_log_flush_wait;
        int sd_log_error; /* First log error */
+       wait_queue_head_t sd_withdraw_wait;
 
        atomic_t sd_reserving_log;
        wait_queue_head_t sd_reserving_log_wait;
 
        unsigned long sd_last_warning;
        struct dentry *debugfs_dir;    /* debugfs directory */
+       unsigned long sd_glock_dqs_held;
 };
 
 static inline void gfs2_glstats_inc(struct gfs2_glock *gl, int which)
 
 
 #include "incore.h"
 #include "glock.h"
+#include "glops.h"
+#include "recovery.h"
 #include "util.h"
 #include "sys.h"
 #include "trace_gfs2.h"
 
        switch (gl->gl_lksb.sb_status) {
        case -DLM_EUNLOCK: /* Unlocked, so glock can be freed */
+               if (gl->gl_ops->go_free)
+                       gl->gl_ops->go_free(gl);
                gfs2_glock_free(gl);
                return;
        case -DLM_ECANCEL: /* Cancel while getting lock */
 /*
  * dlm/gfs2 recovery coordination using dlm_recover callbacks
  *
+ *  0. gfs2 checks for another cluster node withdraw, needing journal replay
  *  1. dlm_controld sees lockspace members change
  *  2. dlm_controld blocks dlm-kernel locking activity
  *  3. dlm_controld within dlm-kernel notifies gfs2 (recover_prep)
                         &ls->ls_control_lksb, "control_lock");
 }
 
+/**
+ * remote_withdraw - react to a node withdrawing from the file system
+ * @sdp: The superblock
+ */
+static void remote_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_jdesc *jd;
+       int ret = 0, count = 0;
+
+       list_for_each_entry(jd, &sdp->sd_jindex_list, jd_list) {
+               if (jd->jd_jid == sdp->sd_lockstruct.ls_jid)
+                       continue;
+               ret = gfs2_recover_journal(jd, true);
+               if (ret)
+                       break;
+               count++;
+       }
+
+       /* Now drop the additional reference we acquired */
+       fs_err(sdp, "Journals checked: %d, ret = %d.\n", count, ret);
+}
+
 static void gfs2_control_func(struct work_struct *work)
 {
        struct gfs2_sbd *sdp = container_of(work, struct gfs2_sbd, sd_control_work.work);
        int recover_size;
        int i, error;
 
+       /* First check for other nodes that may have done a withdraw. */
+       if (test_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags)) {
+               remote_withdraw(sdp);
+               clear_bit(SDF_REMOTE_WITHDRAW, &sdp->sd_flags);
+               return;
+       }
+
        spin_lock(&ls->ls_recover_spin);
        /*
         * No MOUNT_DONE means we're still mounting; control_mount()
 
        struct buffer_head *bh, *bhs[2];
        int num = 0;
 
-       if (unlikely(gfs2_withdrawn(sdp))) {
+       if (unlikely(gfs2_withdrawn(sdp)) &&
+           (!sdp->sd_jdesc || (blkno != sdp->sd_jdesc->jd_no_addr))) {
                *bhp = NULL;
                return -EIO;
        }
 
 
                error = gfs2_glock_nq_num(sdp, sdp->sd_lockstruct.ls_jid,
                                          &gfs2_journal_glops,
-                                         LM_ST_EXCLUSIVE, LM_FLAG_NOEXP,
+                                         LM_ST_EXCLUSIVE,
+                                         LM_FLAG_NOEXP | GL_NOCACHE,
                                          &sdp->sd_journal_gh);
                if (error) {
                        fs_err(sdp, "can't acquire journal glock: %d\n", error);
                }
 
                ip = GFS2_I(sdp->sd_jdesc->jd_inode);
+               sdp->sd_jinode_gl = ip->i_gl;
                error = gfs2_glock_nq_init(ip->i_gl, LM_ST_SHARED,
                                           LM_FLAG_NOEXP | GL_EXACT | GL_NOCACHE,
                                           &sdp->sd_jinode_gh);
        return 0;
 
 fail_jinode_gh:
-       if (!sdp->sd_args.ar_spectator)
+       /* A withdraw may have done dq/uninit so now we need to check it */
+       if (!sdp->sd_args.ar_spectator &&
+           gfs2_holder_initialized(&sdp->sd_jinode_gh))
                gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
 fail_journal_gh:
-       if (!sdp->sd_args.ar_spectator)
+       if (!sdp->sd_args.ar_spectator &&
+           gfs2_holder_initialized(&sdp->sd_journal_gh))
                gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
 fail_jindex:
        gfs2_jindex_free(sdp);
 
 
        while (!kthread_should_stop()) {
 
+               if (gfs2_withdrawn(sdp))
+                       goto bypass;
                /* Update the master statfs file */
                if (sdp->sd_statfs_force_sync) {
                        int error = gfs2_statfs_sync(sdp->sd_vfs, 0);
 
                try_to_freeze();
 
+bypass:
                t = min(quotad_timeo, statfs_timeo);
 
                prepare_to_wait(&sdp->sd_quota_wait, &wait, TASK_INTERRUPTIBLE);
 
        sdp->sd_journals = 0;
        spin_unlock(&sdp->sd_jindex_spin);
 
+       sdp->sd_jdesc = NULL;
        while (!list_empty(&list)) {
                jd = list_entry(list.next, struct gfs2_jdesc, jd_list);
                gfs2_free_journal_extents(jd);
                list_del(&jd->jd_list);
                iput(jd->jd_inode);
+               jd->jd_inode = NULL;
                kfree(jd);
        }
 }
                goto fail_threads;
 
        j_gl->gl_ops->go_inval(j_gl, DIO_METADATA);
+       if (gfs2_withdrawn(sdp)) {
+               error = -EIO;
+               goto fail;
+       }
 
        error = gfs2_find_jhead(sdp->sd_jdesc, &head, false);
-       if (error)
+       if (error || gfs2_withdrawn(sdp))
                goto fail;
 
        if (!(head.lh_flags & GFS2_LOG_HEAD_UNMOUNT)) {
        gfs2_log_pointers_init(sdp, head.lh_blkno);
 
        error = gfs2_quota_init(sdp);
-       if (error)
+       if (error || gfs2_withdrawn(sdp))
                goto fail;
 
        set_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags);
 int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
 {
        struct gfs2_holder freeze_gh;
-       int error;
-
-       error = gfs2_glock_nq_init(sdp->sd_freeze_gl, LM_ST_SHARED, GL_NOCACHE,
-                                  &freeze_gh);
-       if (error && !gfs2_withdrawn(sdp))
-               return error;
+       int error = 0;
+       int log_write_allowed = test_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags);
+
+       gfs2_holder_mark_uninitialized(&freeze_gh);
+       if (sdp->sd_freeze_gl &&
+           !gfs2_glock_is_locked_by_me(sdp->sd_freeze_gl)) {
+               if (!log_write_allowed) {
+                       error = gfs2_glock_nq_init(sdp->sd_freeze_gl,
+                                                  LM_ST_SHARED, GL_NOCACHE |
+                                                  LM_FLAG_TRY, &freeze_gh);
+                       if (error == GLR_TRYFAILED)
+                               error = 0;
+               } else {
+                       error = gfs2_glock_nq_init(sdp->sd_freeze_gl,
+                                                  LM_ST_SHARED, GL_NOCACHE,
+                                                  &freeze_gh);
+                       if (error && !gfs2_withdrawn(sdp))
+                               return error;
+               }
+       }
 
        flush_workqueue(gfs2_delete_workqueue);
-       if (sdp->sd_quotad_process)
+       if (!log_write_allowed && current == sdp->sd_quotad_process)
+               fs_warn(sdp, "The quotad daemon is withdrawing.\n");
+       else if (sdp->sd_quotad_process)
                kthread_stop(sdp->sd_quotad_process);
        sdp->sd_quotad_process = NULL;
-       if (sdp->sd_logd_process)
+
+       if (!log_write_allowed && current == sdp->sd_logd_process)
+               fs_warn(sdp, "The logd daemon is withdrawing.\n");
+       else if (sdp->sd_logd_process)
                kthread_stop(sdp->sd_logd_process);
        sdp->sd_logd_process = NULL;
 
-       gfs2_quota_sync(sdp->sd_vfs, 0);
-       gfs2_statfs_sync(sdp->sd_vfs, 0);
-
-       gfs2_log_flush(sdp, NULL, GFS2_LOG_HEAD_FLUSH_SHUTDOWN |
-                      GFS2_LFC_MAKE_FS_RO);
-       wait_event(sdp->sd_reserving_log_wait, atomic_read(&sdp->sd_reserving_log) == 0);
-       gfs2_assert_warn(sdp, atomic_read(&sdp->sd_log_blks_free) == sdp->sd_jdesc->jd_blocks);
+       if (log_write_allowed) {
+               gfs2_quota_sync(sdp->sd_vfs, 0);
+               gfs2_statfs_sync(sdp->sd_vfs, 0);
 
+               gfs2_log_flush(sdp, NULL, GFS2_LOG_HEAD_FLUSH_SHUTDOWN |
+                              GFS2_LFC_MAKE_FS_RO);
+               wait_event(sdp->sd_reserving_log_wait,
+                          atomic_read(&sdp->sd_reserving_log) == 0);
+               gfs2_assert_warn(sdp, atomic_read(&sdp->sd_log_blks_free) ==
+                                sdp->sd_jdesc->jd_blocks);
+       } else {
+               wait_event_timeout(sdp->sd_reserving_log_wait,
+                                  atomic_read(&sdp->sd_reserving_log) == 0,
+                                  HZ * 5);
+       }
        if (gfs2_holder_initialized(&freeze_gh))
                gfs2_glock_dq_uninit(&freeze_gh);
 
        gfs2_quota_cleanup(sdp);
 
+       if (!log_write_allowed)
+               sdp->sd_vfs->s_flags |= SB_RDONLY;
+
        return error;
 }
 
        gfs2_glock_put(sdp->sd_freeze_gl);
 
        if (!sdp->sd_args.ar_spectator) {
-               gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
-               gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
+               if (gfs2_holder_initialized(&sdp->sd_journal_gh))
+                       gfs2_glock_dq_uninit(&sdp->sd_journal_gh);
+               if (gfs2_holder_initialized(&sdp->sd_jinode_gh))
+                       gfs2_glock_dq_uninit(&sdp->sd_jinode_gh);
                gfs2_glock_dq_uninit(&sdp->sd_sc_gh);
                gfs2_glock_dq_uninit(&sdp->sd_qc_gh);
                iput(sdp->sd_sc_inode);
 
 
 extern struct gfs2_jdesc *gfs2_jdesc_find(struct gfs2_sbd *sdp, unsigned int jid);
 extern int gfs2_jdesc_check(struct gfs2_jdesc *jd);
-
 extern int gfs2_lookup_in_master_dir(struct gfs2_sbd *sdp, char *filename,
                                     struct gfs2_inode **ipp);
 
 
         * never clear the DFL_BLOCK_LOCKS flag, so all our locks would
         * permanently stop working.
         */
+       if (!sdp->sd_jdesc)
+               goto out;
        if (sdp->sd_jdesc->jd_jid == jid && !sdp->sd_args.ar_spectator)
                goto out;
        rv = -ENOENT;
 
 #include <linux/buffer_head.h>
 #include <linux/crc32.h>
 #include <linux/gfs2_ondisk.h>
+#include <linux/delay.h>
 #include <linux/uaccess.h>
 
 #include "gfs2.h"
 #include "incore.h"
 #include "glock.h"
+#include "glops.h"
+#include "log.h"
 #include "lops.h"
 #include "recovery.h"
 #include "rgrp.h"
        return error;
 }
 
+static void signal_our_withdraw(struct gfs2_sbd *sdp)
+{
+       struct gfs2_glock *gl = sdp->sd_live_gh.gh_gl;
+       struct inode *inode = sdp->sd_jdesc->jd_inode;
+       struct gfs2_inode *ip = GFS2_I(inode);
+       u64 no_formal_ino = ip->i_no_formal_ino;
+       int ret = 0;
+       int tries;
+
+       if (test_bit(SDF_NORECOVERY, &sdp->sd_flags))
+               return;
+
+       /* Prevent any glock dq until withdraw recovery is complete */
+       set_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags);
+       /*
+        * Don't tell dlm we're bailing until we have no more buffers in the
+        * wind. If journal had an IO error, the log code should just purge
+        * the outstanding buffers rather than submitting new IO. Making the
+        * file system read-only will flush the journal, etc.
+        *
+        * During a normal unmount, gfs2_make_fs_ro calls gfs2_log_shutdown
+        * which clears SDF_JOURNAL_LIVE. In a withdraw, we must not write
+        * any UNMOUNT log header, so we can't call gfs2_log_shutdown, and
+        * therefore we need to clear SDF_JOURNAL_LIVE manually.
+        */
+       clear_bit(SDF_JOURNAL_LIVE, &sdp->sd_flags);
+       if (!sb_rdonly(sdp->sd_vfs))
+               ret = gfs2_make_fs_ro(sdp);
+
+       /*
+        * Drop the glock for our journal so another node can recover it.
+        */
+       if (gfs2_holder_initialized(&sdp->sd_journal_gh)) {
+               gfs2_glock_dq_wait(&sdp->sd_journal_gh);
+               gfs2_holder_uninit(&sdp->sd_journal_gh);
+       }
+       sdp->sd_jinode_gh.gh_flags |= GL_NOCACHE;
+       gfs2_glock_dq(&sdp->sd_jinode_gh);
+       if (test_bit(SDF_FS_FROZEN, &sdp->sd_flags)) {
+               /* Make sure gfs2_unfreeze works if partially-frozen */
+               flush_workqueue(gfs2_freeze_wq);
+               atomic_set(&sdp->sd_freeze_state, SFS_FROZEN);
+               thaw_super(sdp->sd_vfs);
+       } else {
+               wait_on_bit(&gl->gl_flags, GLF_DEMOTE, TASK_UNINTERRUPTIBLE);
+       }
+
+       /*
+        * holder_uninit to force glock_put, to force dlm to let go
+        */
+       gfs2_holder_uninit(&sdp->sd_jinode_gh);
+
+       /*
+        * Note: We need to be careful here:
+        * Our iput of jd_inode will evict it. The evict will dequeue its
+        * glock, but the glock dq will wait for the withdraw unless we have
+        * exception code in glock_dq.
+        */
+       iput(inode);
+       /*
+        * Wait until the journal inode's glock is freed. This allows try locks
+        * on other nodes to be successful, otherwise we remain the owner of
+        * the glock as far as dlm is concerned.
+        */
+       if (gl->gl_ops->go_free) {
+               set_bit(GLF_FREEING, &gl->gl_flags);
+               wait_on_bit(&gl->gl_flags, GLF_FREEING, TASK_UNINTERRUPTIBLE);
+       }
+
+       if (sdp->sd_lockstruct.ls_ops->lm_lock == NULL) { /* lock_nolock */
+               clear_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags);
+               goto skip_recovery;
+       }
+       /*
+        * Dequeue the "live" glock, but keep a reference so it's never freed.
+        */
+       gfs2_glock_hold(gl);
+       gfs2_glock_dq_wait(&sdp->sd_live_gh);
+       /*
+        * We enqueue the "live" glock in EX so that all other nodes
+        * get a demote request and act on it. We don't really want the
+        * lock in EX, so we send a "try" lock with 1CB to produce a callback.
+        */
+       fs_warn(sdp, "Requesting recovery of jid %d.\n",
+               sdp->sd_lockstruct.ls_jid);
+       gfs2_holder_reinit(LM_ST_EXCLUSIVE, LM_FLAG_TRY_1CB | LM_FLAG_NOEXP,
+                          &sdp->sd_live_gh);
+       msleep(GL_GLOCK_MAX_HOLD);
+       /*
+        * This will likely fail in a cluster, but succeed standalone:
+        */
+       ret = gfs2_glock_nq(&sdp->sd_live_gh);
+
+       /*
+        * If we actually got the "live" lock in EX mode, there are no other
+        * nodes available to replay our journal. So we try to replay it
+        * ourselves. We hold the "live" glock to prevent other mounters
+        * during recovery, then just dequeue it and reacquire it in our
+        * normal SH mode. Just in case the problem that caused us to
+        * withdraw prevents us from recovering our journal (e.g. io errors
+        * and such) we still check if the journal is clean before proceeding
+        * but we may wait forever until another mounter does the recovery.
+        */
+       if (ret == 0) {
+               fs_warn(sdp, "No other mounters found. Trying to recover our "
+                       "own journal jid %d.\n", sdp->sd_lockstruct.ls_jid);
+               if (gfs2_recover_journal(sdp->sd_jdesc, 1))
+                       fs_warn(sdp, "Unable to recover our journal jid %d.\n",
+                               sdp->sd_lockstruct.ls_jid);
+               gfs2_glock_dq_wait(&sdp->sd_live_gh);
+               gfs2_holder_reinit(LM_ST_SHARED, LM_FLAG_NOEXP | GL_EXACT,
+                                  &sdp->sd_live_gh);
+               gfs2_glock_nq(&sdp->sd_live_gh);
+       }
+
+       gfs2_glock_queue_put(gl); /* drop the extra reference we acquired */
+       clear_bit(SDF_WITHDRAW_RECOVERY, &sdp->sd_flags);
+
+       /*
+        * At this point our journal is evicted, so we need to get a new inode
+        * for it. Once done, we need to call gfs2_find_jhead which
+        * calls gfs2_map_journal_extents to map it for us again.
+        *
+        * Note that we don't really want it to look up a FREE block. The
+        * GFS2_BLKST_FREE simply overrides a block check in gfs2_inode_lookup
+        * which would otherwise fail because it requires grabbing an rgrp
+        * glock, which would fail with -EIO because we're withdrawing.
+        */
+       inode = gfs2_inode_lookup(sdp->sd_vfs, DT_UNKNOWN,
+                                 sdp->sd_jdesc->jd_no_addr, no_formal_ino,
+                                 GFS2_BLKST_FREE);
+       if (IS_ERR(inode)) {
+               fs_warn(sdp, "Reprocessing of jid %d failed with %ld.\n",
+                       sdp->sd_lockstruct.ls_jid, PTR_ERR(inode));
+               goto skip_recovery;
+       }
+       sdp->sd_jdesc->jd_inode = inode;
+
+       /*
+        * Now wait until recovery is complete.
+        */
+       for (tries = 0; tries < 10; tries++) {
+               ret = check_journal_clean(sdp, sdp->sd_jdesc);
+               if (!ret)
+                       break;
+               msleep(HZ);
+               fs_warn(sdp, "Waiting for journal recovery jid %d.\n",
+                       sdp->sd_lockstruct.ls_jid);
+       }
+skip_recovery:
+       if (!ret)
+               fs_warn(sdp, "Journal recovery complete for jid %d.\n",
+                       sdp->sd_lockstruct.ls_jid);
+       else
+               fs_warn(sdp, "Journal recovery skipped for %d until next "
+                       "mount.\n", sdp->sd_lockstruct.ls_jid);
+       fs_warn(sdp, "Glock dequeues delayed: %lu\n", sdp->sd_glock_dqs_held);
+       sdp->sd_glock_dqs_held = 0;
+       wake_up_bit(&sdp->sd_flags, SDF_WITHDRAW_RECOVERY);
+}
+
 void gfs2_lm(struct gfs2_sbd *sdp, const char *fmt, ...)
 {
        struct va_format vaf;
        const struct lm_lockops *lm = ls->ls_ops;
 
        if (sdp->sd_args.ar_errors == GFS2_ERRORS_WITHDRAW &&
-           test_and_set_bit(SDF_WITHDRAWN, &sdp->sd_flags))
-               return 0;
+           test_and_set_bit(SDF_WITHDRAWN, &sdp->sd_flags)) {
+               if (!test_bit(SDF_WITHDRAW_IN_PROG, &sdp->sd_flags))
+                       return -1;
+
+               wait_on_bit(&sdp->sd_flags, SDF_WITHDRAW_IN_PROG,
+                           TASK_UNINTERRUPTIBLE);
+               return -1;
+       }
+
+       set_bit(SDF_WITHDRAW_IN_PROG, &sdp->sd_flags);
 
        if (sdp->sd_args.ar_errors == GFS2_ERRORS_WITHDRAW) {
                fs_err(sdp, "about to withdraw this file system\n");
                BUG_ON(sdp->sd_args.ar_debug);
 
+               signal_our_withdraw(sdp);
+
                kobject_uevent(&sdp->sd_kobj, KOBJ_OFFLINE);
 
                if (!strcmp(sdp->sd_lockstruct.ls_ops->lm_proto_name, "lock_dlm"))
                        lm->lm_unmount(sdp);
                }
                set_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags);
-               fs_err(sdp, "withdrawn\n");
+               fs_err(sdp, "File system withdrawn\n");
                dump_stack();
+               clear_bit(SDF_WITHDRAW_IN_PROG, &sdp->sd_flags);
+               smp_mb__after_atomic();
+               wake_up_bit(&sdp->sd_flags, SDF_WITHDRAW_IN_PROG);
        }
 
        if (sdp->sd_args.ar_errors == GFS2_ERRORS_PANIC)