dout("handle_caps from mds%d\n", session->s_mds);
 
+       if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+               return;
+
        /* decode */
        end = msg->front.iov_base + msg->front.iov_len;
        if (msg->front.iov_len < sizeof(*h))
             vino.snap, inode);
 
        mutex_lock(&session->s_mutex);
-       inc_session_sequence(session);
        dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
             (unsigned)seq);
 
 done_unlocked:
        iput(inode);
 out:
+       ceph_dec_mds_stopping_blocker(mdsc);
+
        ceph_put_string(extra_info.pool_ns);
 
        /* Defer closing the sessions after s_mutex lock being released */
 
 
        dout("handle_lease from mds%d\n", mds);
 
+       if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+               return;
+
        /* decode */
        if (msg->front.iov_len < sizeof(*h) + sizeof(u32))
                goto bad;
             dname.len, dname.name);
 
        mutex_lock(&session->s_mutex);
-       inc_session_sequence(session);
-
        if (!inode) {
                dout("handle_lease no inode %llx\n", vino.ino);
                goto release;
 out:
        mutex_unlock(&session->s_mutex);
        iput(inode);
+
+       ceph_dec_mds_stopping_blocker(mdsc);
        return;
 
 bad:
+       ceph_dec_mds_stopping_blocker(mdsc);
+
        pr_err("corrupt lease message\n");
        ceph_msg_dump(msg);
 }
        }
 
        init_completion(&mdsc->safe_umount_waiters);
+       spin_lock_init(&mdsc->stopping_lock);
+       atomic_set(&mdsc->stopping_blockers, 0);
+       init_completion(&mdsc->stopping_waiter);
        init_waitqueue_head(&mdsc->session_close_wq);
        INIT_LIST_HEAD(&mdsc->waiting_for_map);
        mdsc->quotarealms_inodes = RB_ROOT;
 
 };
 
 enum {
-       CEPH_MDSC_STOPPING_BEGIN = 1,
-       CEPH_MDSC_STOPPING_FLUSHED = 2,
+       CEPH_MDSC_STOPPING_BEGIN = 1,
+       CEPH_MDSC_STOPPING_FLUSHING = 2,
+       CEPH_MDSC_STOPPING_FLUSHED = 3,
 };
 
 /*
        struct ceph_mds_session **sessions;    /* NULL for mds if no session */
        atomic_t                num_sessions;
        int                     max_sessions;  /* len of sessions array */
-       int                     stopping;      /* true if shutting down */
+
+       spinlock_t              stopping_lock;  /* protect snap_empty */
+       int                     stopping;      /* the stage of shutting down */
+       atomic_t                stopping_blockers;
+       struct completion       stopping_waiter;
 
        atomic64_t              quotarealms_count; /* # realms with quota */
        /*
 
        struct inode *inode;
        struct ceph_inode_info *ci;
 
+       if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+               return;
+
        if (msg->front.iov_len < sizeof(*h)) {
                pr_err("%s corrupt message mds%d len %d\n", __func__,
                       session->s_mds, (int)msg->front.iov_len);
                ceph_msg_dump(msg);
-               return;
+               goto out;
        }
 
-       /* increment msg sequence number */
-       mutex_lock(&session->s_mutex);
-       inc_session_sequence(session);
-       mutex_unlock(&session->s_mutex);
-
        /* lookup inode */
        vino.ino = le64_to_cpu(h->ino);
        vino.snap = CEPH_NOSNAP;
        inode = ceph_find_inode(sb, vino);
        if (!inode) {
                pr_warn("Failed to find inode %llu\n", vino.ino);
-               return;
+               goto out;
        }
        ci = ceph_inode(inode);
 
        spin_unlock(&ci->i_ceph_lock);
 
        iput(inode);
+out:
+       ceph_dec_mds_stopping_blocker(mdsc);
 }
 
 static struct ceph_quotarealm_inode *
 
        int locked_rwsem = 0;
        bool close_sessions = false;
 
+       if (!ceph_inc_mds_stopping_blocker(mdsc, session))
+               return;
+
        /* decode */
        if (msg->front.iov_len < sizeof(*h))
                goto bad;
        dout("%s from mds%d op %s split %llx tracelen %d\n", __func__,
             mds, ceph_snap_op_name(op), split, trace_len);
 
-       mutex_lock(&session->s_mutex);
-       inc_session_sequence(session);
-       mutex_unlock(&session->s_mutex);
-
        down_write(&mdsc->snap_rwsem);
        locked_rwsem = 1;
 
        up_write(&mdsc->snap_rwsem);
 
        flush_snaps(mdsc);
+       ceph_dec_mds_stopping_blocker(mdsc);
        return;
 
 bad:
        if (locked_rwsem)
                up_write(&mdsc->snap_rwsem);
 
+       ceph_dec_mds_stopping_blocker(mdsc);
+
        if (close_sessions)
                ceph_mdsc_close_sessions(mdsc);
        return;
 
        return -ENOMEM;
 }
 
+/*
+ * Return true if it successfully increases the blocker counter,
+ * or false if the mdsc is in stopping and flushed state.
+ */
+static bool __inc_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+       spin_lock(&mdsc->stopping_lock);
+       if (mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHING) {
+               spin_unlock(&mdsc->stopping_lock);
+               return false;
+       }
+       atomic_inc(&mdsc->stopping_blockers);
+       spin_unlock(&mdsc->stopping_lock);
+       return true;
+}
+
+static void __dec_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+       spin_lock(&mdsc->stopping_lock);
+       if (!atomic_dec_return(&mdsc->stopping_blockers) &&
+           mdsc->stopping >= CEPH_MDSC_STOPPING_FLUSHING)
+               complete_all(&mdsc->stopping_waiter);
+       spin_unlock(&mdsc->stopping_lock);
+}
+
+/* For metadata IO requests */
+bool ceph_inc_mds_stopping_blocker(struct ceph_mds_client *mdsc,
+                                  struct ceph_mds_session *session)
+{
+       mutex_lock(&session->s_mutex);
+       inc_session_sequence(session);
+       mutex_unlock(&session->s_mutex);
+
+       return __inc_stopping_blocker(mdsc);
+}
+
+void ceph_dec_mds_stopping_blocker(struct ceph_mds_client *mdsc)
+{
+       __dec_stopping_blocker(mdsc);
+}
+
 static void ceph_kill_sb(struct super_block *s)
 {
        struct ceph_fs_client *fsc = ceph_sb_to_client(s);
+       struct ceph_mds_client *mdsc = fsc->mdsc;
+       bool wait;
 
        dout("kill_sb %p\n", s);
 
-       ceph_mdsc_pre_umount(fsc->mdsc);
+       ceph_mdsc_pre_umount(mdsc);
        flush_fs_workqueues(fsc);
 
        /*
         * Though the kill_anon_super() will finally trigger the
-        * sync_filesystem() anyway, we still need to do it here
-        * and then bump the stage of shutdown to stop the work
-        * queue as earlier as possible.
+        * sync_filesystem() anyway, we still need to do it here and
+        * then bump the stage of shutdown. This will allow us to
+        * drop any further message, which will increase the inodes'
+        * i_count reference counters but makes no sense any more,
+        * from MDSs.
+        *
+        * Without this when evicting the inodes it may fail in the
+        * kill_anon_super(), which will trigger a warning when
+        * destroying the fscrypt keyring and then possibly trigger
+        * a further crash in ceph module when the iput() tries to
+        * evict the inodes later.
         */
        sync_filesystem(s);
 
-       fsc->mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
+       spin_lock(&mdsc->stopping_lock);
+       mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHING;
+       wait = !!atomic_read(&mdsc->stopping_blockers);
+       spin_unlock(&mdsc->stopping_lock);
+
+       if (wait && atomic_read(&mdsc->stopping_blockers)) {
+               long timeleft = wait_for_completion_killable_timeout(
+                                       &mdsc->stopping_waiter,
+                                       fsc->client->options->mount_timeout);
+               if (!timeleft) /* timed out */
+                       pr_warn("umount timed out, %ld\n", timeleft);
+               else if (timeleft < 0) /* killed */
+                       pr_warn("umount was killed, %ld\n", timeleft);
+       }
 
+       mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHED;
        kill_anon_super(s);
 
        fsc->client->extra_mon_dispatch = NULL;
 
                                     struct kstatfs *buf);
 extern void ceph_cleanup_quotarealms_inodes(struct ceph_mds_client *mdsc);
 
+bool ceph_inc_mds_stopping_blocker(struct ceph_mds_client *mdsc,
+                              struct ceph_mds_session *session);
+void ceph_dec_mds_stopping_blocker(struct ceph_mds_client *mdsc);
 #endif /* _FS_CEPH_SUPER_H */