* locking semantics of the file system using the protocol.  It should 
  * be somewhere else, I'm sure, but right now it isn't.
  *
+ * New in version 6:
+ *     - DLM lockres remote refcount fixes.
+ *
  * New in version 5:
  *     - Network timeout checking protocol
  *
  *     - full 64 bit i_size in the metadata lock lvbs
  *     - introduction of "rw" lock and pushing meta/data locking down
  */
-#define O2NET_PROTOCOL_VERSION 5ULL
+#define O2NET_PROTOCOL_VERSION 6ULL
 struct o2net_handshake {
        __be64  protocol_version;
        __be64  connector_id;
 
 #define DLM_LOCK_RES_DIRTY                0x00000008
 #define DLM_LOCK_RES_IN_PROGRESS          0x00000010
 #define DLM_LOCK_RES_MIGRATING            0x00000020
+#define DLM_LOCK_RES_DROPPING_REF         0x00000040
 
 /* max milliseconds to wait to sync up a network failure with a node death */
 #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
        u8  owner;              //node which owns the lock resource, or unknown
        u16 state;
        char lvb[DLM_LVB_LEN];
+       unsigned int inflight_locks;
+       unsigned long refmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
 };
 
 struct dlm_migratable_lock
        DLM_CONVERT_LOCK_MSG,    /* 504 */
        DLM_PROXY_AST_MSG,       /* 505 */
        DLM_UNLOCK_LOCK_MSG,     /* 506 */
-       DLM_UNUSED_MSG2,         /* 507 */
+       DLM_DEREF_LOCKRES_MSG,   /* 507 */
        DLM_MIGRATE_REQUEST_MSG, /* 508 */
        DLM_MIG_LOCKRES_MSG,     /* 509 */
        DLM_QUERY_JOIN_MSG,      /* 510 */
        u8 name[O2NM_MAX_NAME_LEN];
 };
 
+#define DLM_ASSERT_RESPONSE_REASSERT       0x00000001
+#define DLM_ASSERT_RESPONSE_MASTERY_REF    0x00000002
+
 #define DLM_ASSERT_MASTER_MLE_CLEANUP      0x00000001
 #define DLM_ASSERT_MASTER_REQUERY          0x00000002
 #define DLM_ASSERT_MASTER_FINISH_MIGRATION 0x00000004
        u8 name[O2NM_MAX_NAME_LEN];
 };
 
+#define DLM_MIGRATE_RESPONSE_MASTERY_REF   0x00000001
+
 struct dlm_migrate_request
 {
        u8 master;
        __be32 pad2;
 };
 
+struct dlm_deref_lockres
+{
+       u32 pad1;
+       u16 pad2;
+       u8 node_idx;
+       u8 namelen;
+
+       u8 name[O2NM_MAX_NAME_LEN];
+};
+
 static inline enum dlm_status
 __dlm_lockres_state_to_status(struct dlm_lock_resource *res)
 {
                              struct dlm_lock_resource *res);
 void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
                            struct dlm_lock_resource *res);
-void dlm_purge_lockres(struct dlm_ctxt *dlm,
-                      struct dlm_lock_resource *lockres);
+int dlm_purge_lockres(struct dlm_ctxt *dlm,
+                     struct dlm_lock_resource *lockres);
 static inline void dlm_lockres_get(struct dlm_lock_resource *res)
 {
        /* This is called on every lookup, so it might be worth
 void __dlm_unhash_lockres(struct dlm_lock_resource *res);
 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
                          struct dlm_lock_resource *res);
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+                                                    const char *name,
+                                                    unsigned int len,
+                                                    unsigned int hash);
 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
                                                const char *name,
                                                unsigned int len,
                                          const char *name,
                                          unsigned int namelen);
 
+#define dlm_lockres_set_refmap_bit(bit,res)  \
+       __dlm_lockres_set_refmap_bit(bit,res,__FILE__,__LINE__)
+#define dlm_lockres_clear_refmap_bit(bit,res)  \
+       __dlm_lockres_clear_refmap_bit(bit,res,__FILE__,__LINE__)
+
+static inline void __dlm_lockres_set_refmap_bit(int bit,
+                                               struct dlm_lock_resource *res,
+                                               const char *file,
+                                               int line)
+{
+       //printk("%s:%d:%.*s: setting bit %d\n", file, line,
+       //     res->lockname.len, res->lockname.name, bit);
+       set_bit(bit, res->refmap);
+}
+
+static inline void __dlm_lockres_clear_refmap_bit(int bit,
+                                                 struct dlm_lock_resource *res,
+                                                 const char *file,
+                                                 int line)
+{
+       //printk("%s:%d:%.*s: clearing bit %d\n", file, line,
+       //     res->lockname.len, res->lockname.name, bit);
+       clear_bit(bit, res->refmap);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+                                  struct dlm_lock_resource *res,
+                                  const char *file,
+                                  int line);
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+                                  struct dlm_lock_resource *res,
+                                  int new_lockres,
+                                  const char *file,
+                                  int line);
+#define dlm_lockres_drop_inflight_ref(d,r)  \
+       __dlm_lockres_drop_inflight_ref(d,r,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref(d,r)  \
+       __dlm_lockres_grab_inflight_ref(d,r,0,__FILE__,__LINE__)
+#define dlm_lockres_grab_inflight_ref_new(d,r)  \
+       __dlm_lockres_grab_inflight_ref(d,r,1,__FILE__,__LINE__)
+
 void dlm_queue_ast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 void dlm_queue_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock);
 void dlm_do_local_ast(struct dlm_ctxt *dlm,
 int dlm_migrate_lockres(struct dlm_ctxt *dlm,
                        struct dlm_lock_resource *res,
                        u8 target);
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res);
 int dlm_finish_migration(struct dlm_ctxt *dlm,
                         struct dlm_lock_resource *res,
                         u8 old_master);
 
 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data);
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data);
 int dlm_init_mle_cache(void);
 void dlm_destroy_mle_cache(void);
 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up);
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm,
+                        struct dlm_lock_resource *res);
 void dlm_clean_master_list(struct dlm_ctxt *dlm,
                           u8 dead_node);
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock);
-
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res);
 int __dlm_lockres_unused(struct dlm_lock_resource *res);
 
 static inline const char * dlm_lock_mode_name(int mode)
 
        spin_unlock(&res->spinlock);
 }
 
+static void dlm_print_lockres_refmap(struct dlm_lock_resource *res)
+{
+       int bit;
+       assert_spin_locked(&res->spinlock);
+
+       mlog(ML_NOTICE, "  refmap nodes: [ ");
+       bit = 0;
+       while (1) {
+               bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
+               if (bit >= O2NM_MAX_NODES)
+                       break;
+               printk("%u ", bit);
+               bit++;
+       }
+       printk("], inflight=%u\n", res->inflight_locks);
+}
+
 void __dlm_print_one_lock_resource(struct dlm_lock_resource *res)
 {
        struct list_head *iter2;
               res->owner, res->state);
        mlog(ML_NOTICE, "  last used: %lu, on purge list: %s\n",
             res->last_used, list_empty(&res->purge) ? "no" : "yes");
+       dlm_print_lockres_refmap(res);
        mlog(ML_NOTICE, "  granted queue: \n");
        list_for_each(iter2, &res->granted) {
                lock = list_entry(iter2, struct dlm_lock, list);
 
        hlist_add_head(&res->hash_node, bucket);
 }
 
-struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
-                                               const char *name,
-                                               unsigned int len,
-                                               unsigned int hash)
+struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
+                                                    const char *name,
+                                                    unsigned int len,
+                                                    unsigned int hash)
 {
        struct hlist_head *bucket;
        struct hlist_node *list;
        return NULL;
 }
 
+/* intended to be called by functions which do not care about lock
+ * resources which are being purged (most net _handler functions).
+ * this will return NULL for any lock resource which is found but
+ * currently in the process of dropping its mastery reference.
+ * use __dlm_lookup_lockres_full when you need the lock resource
+ * regardless (e.g. dlm_get_lock_resource) */
+struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
+                                               const char *name,
+                                               unsigned int len,
+                                               unsigned int hash)
+{
+       struct dlm_lock_resource *res = NULL;
+
+       mlog_entry("%.*s\n", len, name);
+
+       assert_spin_locked(&dlm->spinlock);
+
+       res = __dlm_lookup_lockres_full(dlm, name, len, hash);
+       if (res) {
+               spin_lock(&res->spinlock);
+               if (res->state & DLM_LOCK_RES_DROPPING_REF) {
+                       spin_unlock(&res->spinlock);
+                       dlm_lockres_put(res);
+                       return NULL;
+               }
+               spin_unlock(&res->spinlock);
+       }
+
+       return res;
+}
+
 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
                                    const char *name,
                                    unsigned int len)
        wake_up(&dlm_domain_events);
 }
 
-static void dlm_migrate_all_locks(struct dlm_ctxt *dlm)
+static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
 {
-       int i;
+       int i, num, n, ret = 0;
        struct dlm_lock_resource *res;
+       struct hlist_node *iter;
+       struct hlist_head *bucket;
+       int dropped;
 
        mlog(0, "Migrating locks from domain %s\n", dlm->name);
-restart:
+
+       num = 0;
        spin_lock(&dlm->spinlock);
        for (i = 0; i < DLM_HASH_BUCKETS; i++) {
-               while (!hlist_empty(dlm_lockres_hash(dlm, i))) {
-                       res = hlist_entry(dlm_lockres_hash(dlm, i)->first,
-                                         struct dlm_lock_resource, hash_node);
-                       /* need reference when manually grabbing lockres */
+redo_bucket:
+               n = 0;
+               bucket = dlm_lockres_hash(dlm, i);
+               iter = bucket->first;
+               while (iter) {
+                       n++;
+                       res = hlist_entry(iter, struct dlm_lock_resource,
+                                         hash_node);
                        dlm_lockres_get(res);
-                       /* this should unhash the lockres
-                        * and exit with dlm->spinlock */
-                       mlog(0, "purging res=%p\n", res);
-                       if (dlm_lockres_is_dirty(dlm, res)) {
-                               /* HACK!  this should absolutely go.
-                                * need to figure out why some empty
-                                * lockreses are still marked dirty */
-                               mlog(ML_ERROR, "lockres %.*s dirty!\n",
-                                    res->lockname.len, res->lockname.name);
-
-                               spin_unlock(&dlm->spinlock);
-                               dlm_kick_thread(dlm, res);
-                               wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
-                               dlm_lockres_put(res);
-                               goto restart;
-                       }
-                       dlm_purge_lockres(dlm, res);
+                       /* migrate, if necessary.  this will drop the dlm
+                        * spinlock and retake it if it does migration. */
+                       dropped = dlm_empty_lockres(dlm, res);
+
+                       spin_lock(&res->spinlock);
+                       __dlm_lockres_calc_usage(dlm, res);
+                       iter = res->hash_node.next;
+                       spin_unlock(&res->spinlock);
+
                        dlm_lockres_put(res);
+
+                       cond_resched_lock(&dlm->spinlock);
+
+                       if (dropped)
+                               goto redo_bucket;
                }
+               num += n;
+               mlog(0, "%s: touched %d lockreses in bucket %d "
+                    "(tot=%d)\n", dlm->name, n, i, num);
        }
        spin_unlock(&dlm->spinlock);
-
+       wake_up(&dlm->dlm_thread_wq);
+
+       /* let the dlm thread take care of purging, keep scanning until
+        * nothing remains in the hash */
+       if (num) {
+               mlog(0, "%s: %d lock resources in hash last pass\n",
+                    dlm->name, num);
+               ret = -EAGAIN;
+       }
        mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
+       return ret;
 }
 
 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
                /* We changed dlm state, notify the thread */
                dlm_kick_thread(dlm, NULL);
 
-               dlm_migrate_all_locks(dlm);
+               while (dlm_migrate_all_locks(dlm)) {
+                       mlog(0, "%s: more migration to do\n", dlm->name);
+               }
                dlm_mark_domain_leaving(dlm);
                dlm_leave_domain(dlm);
                dlm_complete_dlm_shutdown(dlm);
        if (status)
                goto bail;
 
+       status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
+                                       sizeof(struct dlm_deref_lockres),
+                                       dlm_deref_lockres_handler,
+                                       dlm, &dlm->dlm_domain_handlers);
+       if (status)
+               goto bail;
+
        status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
                                        sizeof(struct dlm_migrate_request),
                                        dlm_migrate_request_handler,
 
                        kick_thread = 1;
                }
        }
+       /* reduce the inflight count, this may result in the lockres
+        * being purged below during calc_usage */
+       if (lock->ml.node == dlm->node_num)
+               dlm_lockres_drop_inflight_ref(dlm, res);
 
        spin_unlock(&res->spinlock);
        wake_up(&res->wq);
 
                            int idx);
 
 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
-                               unsigned int namelen, void *nodemap,
-                               u32 flags);
+static int dlm_do_assert_master(struct dlm_ctxt *dlm,
+                               struct dlm_lock_resource *res,
+                               void *nodemap, u32 flags);
 
 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
                                struct dlm_master_list_entry *mle,
                        struct dlm_master_list_entry **mle,
                        char *name, unsigned int namelen);
 
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+                                struct dlm_master_list_entry *mle, int to);
 
 
 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
        INIT_LIST_HEAD(&res->purge);
        atomic_set(&res->asts_reserved, 0);
        res->migration_pending = 0;
+       res->inflight_locks = 0;
 
        kref_init(&res->refs);
 
        res->last_used = 0;
 
        memset(res->lvb, 0, DLM_LVB_LEN);
+       memset(res->refmap, 0, sizeof(res->refmap));
 }
 
 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
        return res;
 }
 
+void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm,
+                                  struct dlm_lock_resource *res,
+                                  int new_lockres,
+                                  const char *file,
+                                  int line)
+{
+       if (!new_lockres)
+               assert_spin_locked(&res->spinlock);
+
+       if (!test_bit(dlm->node_num, res->refmap)) {
+               BUG_ON(res->inflight_locks != 0);
+               dlm_lockres_set_refmap_bit(dlm->node_num, res);
+       }
+       res->inflight_locks++;
+       mlog(0, "%s:%.*s: inflight++: now %u\n",
+            dlm->name, res->lockname.len, res->lockname.name,
+            res->inflight_locks);
+}
+
+void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm,
+                                  struct dlm_lock_resource *res,
+                                  const char *file,
+                                  int line)
+{
+       assert_spin_locked(&res->spinlock);
+
+       BUG_ON(res->inflight_locks == 0);
+       res->inflight_locks--;
+       mlog(0, "%s:%.*s: inflight--: now %u\n",
+            dlm->name, res->lockname.len, res->lockname.name,
+            res->inflight_locks);
+       if (res->inflight_locks == 0)
+               dlm_lockres_clear_refmap_bit(dlm->node_num, res);
+       wake_up(&res->wq);
+}
+
 /*
  * lookup a lock resource by name.
  * may already exist in the hashtable.
        unsigned int hash;
        int tries = 0;
        int bit, wait_on_recovery = 0;
+       int drop_inflight_if_nonlocal = 0;
 
        BUG_ON(!lockid);
 
 
 lookup:
        spin_lock(&dlm->spinlock);
-       tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
+       tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash);
        if (tmpres) {
+               int dropping_ref = 0;
+
+               spin_lock(&tmpres->spinlock);
+               if (tmpres->owner == dlm->node_num) {
+                       BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF);
+                       dlm_lockres_grab_inflight_ref(dlm, tmpres);
+               } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF)
+                       dropping_ref = 1;
+               spin_unlock(&tmpres->spinlock);
                spin_unlock(&dlm->spinlock);
+
+               /* wait until done messaging the master, drop our ref to allow
+                * the lockres to be purged, start over. */
+               if (dropping_ref) {
+                       spin_lock(&tmpres->spinlock);
+                       __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF);
+                       spin_unlock(&tmpres->spinlock);
+                       dlm_lockres_put(tmpres);
+                       tmpres = NULL;
+                       goto lookup;
+               }
+
                mlog(0, "found in hash!\n");
                if (res)
                        dlm_lockres_put(res);
                spin_lock(&res->spinlock);
                dlm_change_lockres_owner(dlm, res, dlm->node_num);
                __dlm_insert_lockres(dlm, res);
+               dlm_lockres_grab_inflight_ref(dlm, res);
                spin_unlock(&res->spinlock);
                spin_unlock(&dlm->spinlock);
                /* lockres still marked IN_PROGRESS */
        /* if we found a block, wait for lock to be mastered by another node */
        blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
        if (blocked) {
+               int mig;
                if (mle->type == DLM_MLE_MASTER) {
                        mlog(ML_ERROR, "master entry for nonexistent lock!\n");
                        BUG();
-               } else if (mle->type == DLM_MLE_MIGRATION) {
-                       /* migration is in progress! */
-                       /* the good news is that we now know the
-                        * "current" master (mle->master). */
-
+               }
+               mig = (mle->type == DLM_MLE_MIGRATION);
+               /* if there is a migration in progress, let the migration
+                * finish before continuing.  we can wait for the absence
+                * of the MIGRATION mle: either the migrate finished or
+                * one of the nodes died and the mle was cleaned up.
+                * if there is a BLOCK here, but it already has a master
+                * set, we are too late.  the master does not have a ref
+                * for us in the refmap.  detach the mle and drop it.
+                * either way, go back to the top and start over. */
+               if (mig || mle->master != O2NM_MAX_NODES) {
+                       BUG_ON(mig && mle->master == dlm->node_num);
+                       /* we arrived too late.  the master does not
+                        * have a ref for us. retry. */
+                       mlog(0, "%s:%.*s: late on %s\n",
+                            dlm->name, namelen, lockid,
+                            mig ?  "MIGRATION" : "BLOCK");
                        spin_unlock(&dlm->master_lock);
-                       assert_spin_locked(&dlm->spinlock);
-
-                       /* set the lockres owner and hash it */
-                       spin_lock(&res->spinlock);
-                       dlm_set_lockres_owner(dlm, res, mle->master);
-                       __dlm_insert_lockres(dlm, res);
-                       spin_unlock(&res->spinlock);
                        spin_unlock(&dlm->spinlock);
 
                        /* master is known, detach */
-                       dlm_mle_detach_hb_events(dlm, mle);
+                       if (!mig)
+                               dlm_mle_detach_hb_events(dlm, mle);
                        dlm_put_mle(mle);
                        mle = NULL;
-                       goto wake_waiters;
+                       /* this is lame, but we cant wait on either
+                        * the mle or lockres waitqueue here */
+                       if (mig)
+                               msleep(100);
+                       goto lookup;
                }
        } else {
                /* go ahead and try to master lock on this node */
 
        /* finally add the lockres to its hash bucket */
        __dlm_insert_lockres(dlm, res);
+       /* since this lockres is new it doesnt not require the spinlock */
+       dlm_lockres_grab_inflight_ref_new(dlm, res);
+
+       /* if this node does not become the master make sure to drop
+        * this inflight reference below */
+       drop_inflight_if_nonlocal = 1;
+
        /* get an extra ref on the mle in case this is a BLOCK
         * if so, the creator of the BLOCK may try to put the last
         * ref at this time in the assert master handler, so we
        ret = -EINVAL;
        dlm_node_iter_init(mle->vote_map, &iter);
        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
-               ret = dlm_do_master_request(mle, nodenum);
+               ret = dlm_do_master_request(res, mle, nodenum);
                if (ret < 0)
                        mlog_errno(ret);
                if (mle->master != O2NM_MAX_NODES) {
 
 wake_waiters:
        spin_lock(&res->spinlock);
+       if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)
+               dlm_lockres_drop_inflight_ref(dlm, res);
        res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
        spin_unlock(&res->spinlock);
        wake_up(&res->wq);
                /* this will cause the master to re-assert across
                 * the whole cluster, freeing up mles */
                if (res->owner != dlm->node_num) {
-                       ret = dlm_do_master_request(mle, res->owner);
+                       ret = dlm_do_master_request(res, mle, res->owner);
                        if (ret < 0) {
                                /* give recovery a chance to run */
                                mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
                                 * now tell other nodes that I am
                                 * mastering this. */
                                mle->master = dlm->node_num;
+                               /* ref was grabbed in get_lock_resource
+                                * will be dropped in dlmlock_master */
                                assert = 1;
                                sleep = 0;
                        }
                                         (atomic_read(&mle->woken) == 1),
                                         timeo);
                if (res->owner == O2NM_MAX_NODES) {
-                       mlog(0, "waiting again\n");
+                       mlog(0, "%s:%.*s: waiting again\n", dlm->name,
+                            res->lockname.len, res->lockname.name);
                        goto recheck;
                }
                mlog(0, "done waiting, master is %u\n", res->owner);
                m = dlm->node_num;
                mlog(0, "about to master %.*s here, this=%u\n",
                     res->lockname.len, res->lockname.name, m);
-               ret = dlm_do_assert_master(dlm, res->lockname.name,
-                                          res->lockname.len, mle->vote_map, 0);
+               ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);
                if (ret) {
                        /* This is a failure in the network path,
                         * not in the response to the assert_master
 
        /* set the lockres owner */
        spin_lock(&res->spinlock);
+       /* mastery reference obtained either during
+        * assert_master_handler or in get_lock_resource */
        dlm_change_lockres_owner(dlm, res, m);
        spin_unlock(&res->spinlock);
 
  *
  */
 
-static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
+static int dlm_do_master_request(struct dlm_lock_resource *res,
+                                struct dlm_master_list_entry *mle, int to)
 {
        struct dlm_ctxt *dlm = mle->dlm;
        struct dlm_master_request request;
                case DLM_MASTER_RESP_YES:
                        set_bit(to, mle->response_map);
                        mlog(0, "node %u is the master, response=YES\n", to);
+                       mlog(0, "%s:%.*s: master node %u now knows I have a "
+                            "reference\n", dlm->name, res->lockname.len,
+                            res->lockname.name, to);
                        mle->master = to;
                        break;
                case DLM_MASTER_RESP_NO:
                }
 
                if (res->owner == dlm->node_num) {
+                       mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+                            dlm->name, namelen, name, request->node_idx);
+                       dlm_lockres_set_refmap_bit(request->node_idx, res);
                        spin_unlock(&res->spinlock);
-                       // mlog(0, "this node is the master\n");
                        response = DLM_MASTER_RESP_YES;
                        if (mle)
                                kmem_cache_free(dlm_mle_cache, mle);
                        mlog(0, "node %u is master, but trying to migrate to "
                             "node %u.\n", tmpmle->master, tmpmle->new_master);
                        if (tmpmle->master == dlm->node_num) {
-                               response = DLM_MASTER_RESP_YES;
                                mlog(ML_ERROR, "no owner on lockres, but this "
                                     "node is trying to migrate it to %u?!\n",
                                     tmpmle->new_master);
                                 * go back and clean the mles on any
                                 * other nodes */
                                dispatch_assert = 1;
+                               dlm_lockres_set_refmap_bit(request->node_idx, res);
+                               mlog(0, "%s:%.*s: setting bit %u in refmap\n",
+                                    dlm->name, namelen, name,
+                                    request->node_idx);
                        } else
                                response = DLM_MASTER_RESP_NO;
                } else {
  * can periodically run all locks owned by this node
  * and re-assert across the cluster...
  */
-static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
-                               unsigned int namelen, void *nodemap,
-                               u32 flags)
+int dlm_do_assert_master(struct dlm_ctxt *dlm,
+                        struct dlm_lock_resource *res,
+                        void *nodemap, u32 flags)
 {
        struct dlm_assert_master assert;
        int to, tmpret;
        struct dlm_node_iter iter;
        int ret = 0;
        int reassert;
+       const char *lockname = res->lockname.name;
+       unsigned int namelen = res->lockname.len;
 
        BUG_ON(namelen > O2NM_MAX_NAME_LEN);
 again:
                        mlog(0, "link to %d went down!\n", to);
                        /* any nonzero status return will do */
                        ret = tmpret;
+                       r = 0;
                } else if (r < 0) {
                        /* ok, something horribly messed.  kill thyself. */
                        mlog(ML_ERROR,"during assert master of %.*s to %u, "
                        spin_unlock(&dlm->master_lock);
                        spin_unlock(&dlm->spinlock);
                        BUG();
-               } else if (r == EAGAIN) {
+               }
+
+               if (r & DLM_ASSERT_RESPONSE_REASSERT &&
+                   !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {
+                               mlog(ML_ERROR, "%.*s: very strange, "
+                                    "master MLE but no lockres on %u\n",
+                                    namelen, lockname, to);
+               }
+
+               if (r & DLM_ASSERT_RESPONSE_REASSERT) {
                        mlog(0, "%.*s: node %u create mles on other "
                             "nodes and requests a re-assert\n", 
                             namelen, lockname, to);
                        reassert = 1;
                }
+               if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {
+                       mlog(0, "%.*s: node %u has a reference to this "
+                            "lockres, set the bit in the refmap\n",
+                            namelen, lockname, to);
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_set_refmap_bit(to, res);
+                       spin_unlock(&res->spinlock);
+               }
        }
 
        if (reassert)
        char *name;
        unsigned int namelen, hash;
        u32 flags;
-       int master_request = 0;
+       int master_request = 0, have_lockres_ref = 0;
        int ret = 0;
 
        if (!dlm_grab(dlm))
                                dlm_change_lockres_owner(dlm, res, mle->master);
                        }
                        spin_unlock(&res->spinlock);
+                       have_lockres_ref = 1;
                }
 
                /* master is known, detach if not already detached.
        dlm_put(dlm);
        if (master_request) {
                mlog(0, "need to tell master to reassert\n");
-               ret = EAGAIN;  // positive. negative would shoot down the node.
+               /* positive. negative would shoot down the node. */
+               ret |= DLM_ASSERT_RESPONSE_REASSERT;
+               if (!have_lockres_ref) {
+                       mlog(ML_ERROR, "strange, got assert from %u, MASTER "
+                            "mle present here for %s:%.*s, but no lockres!\n",
+                            assert->node_idx, dlm->name, namelen, name);
+               }
+       }
+       if (have_lockres_ref) {
+               /* let the master know we have a reference to the lockres */
+               ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;
+               mlog(0, "%s:%.*s: got assert from %u, need a ref\n",
+                    dlm->name, namelen, name, assert->node_idx);
        }
        return ret;
 
         * even if one or more nodes die */
        mlog(0, "worker about to master %.*s here, this=%u\n",
                     res->lockname.len, res->lockname.name, dlm->node_num);
-       ret = dlm_do_assert_master(dlm, res->lockname.name,
-                                  res->lockname.len,
-                                  nodemap, flags);
+       ret = dlm_do_assert_master(dlm, res, nodemap, flags);
        if (ret < 0) {
                /* no need to restart, we are done */
                if (!dlm_is_host_down(ret))
        return ret;
 }
 
+/*
+ * DLM_DEREF_LOCKRES_MSG
+ */
+
+int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+       struct dlm_deref_lockres deref;
+       int ret = 0, r;
+       const char *lockname;
+       unsigned int namelen;
+
+       lockname = res->lockname.name;
+       namelen = res->lockname.len;
+       BUG_ON(namelen > O2NM_MAX_NAME_LEN);
+
+       mlog(0, "%s:%.*s: sending deref to %d\n",
+            dlm->name, namelen, lockname, res->owner);
+       memset(&deref, 0, sizeof(deref));
+       deref.node_idx = dlm->node_num;
+       deref.namelen = namelen;
+       memcpy(deref.name, lockname, namelen);
+
+       ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,
+                                &deref, sizeof(deref), res->owner, &r);
+       if (ret < 0)
+               mlog_errno(ret);
+       else if (r < 0) {
+               /* BAD.  other node says I did not have a ref. */
+               mlog(ML_ERROR,"while dropping ref on %s:%.*s "
+                   "(master=%u) got %d.\n", dlm->name, namelen,
+                   lockname, res->owner, r);
+               dlm_print_one_lock_resource(res);
+               BUG();
+       }
+       return ret;
+}
+
+int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data)
+{
+       struct dlm_ctxt *dlm = data;
+       struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;
+       struct dlm_lock_resource *res = NULL;
+       char *name;
+       unsigned int namelen;
+       int ret = -EINVAL;
+       u8 node;
+       unsigned int hash;
+
+       if (!dlm_grab(dlm))
+               return 0;
+
+       name = deref->name;
+       namelen = deref->namelen;
+       node = deref->node_idx;
+
+       if (namelen > DLM_LOCKID_NAME_MAX) {
+               mlog(ML_ERROR, "Invalid name length!");
+               goto done;
+       }
+       if (deref->node_idx >= O2NM_MAX_NODES) {
+               mlog(ML_ERROR, "Invalid node number: %u\n", node);
+               goto done;
+       }
+
+       hash = dlm_lockid_hash(name, namelen);
+
+       spin_lock(&dlm->spinlock);
+       res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);
+       if (!res) {
+               spin_unlock(&dlm->spinlock);
+               mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",
+                    dlm->name, namelen, name);
+               goto done;
+       }
+       spin_unlock(&dlm->spinlock);
+
+       spin_lock(&res->spinlock);
+       BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);
+       if (test_bit(node, res->refmap)) {
+               ret = 0;
+               dlm_lockres_clear_refmap_bit(node, res);
+       } else {
+               mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "
+                    "but it is already dropped!\n", dlm->name, namelen,
+                    name, node);
+               __dlm_print_one_lock_resource(res);
+       }
+       spin_unlock(&res->spinlock);
+
+       if (!ret)
+               dlm_lockres_calc_usage(dlm, res);
+done:
+       if (res)
+               dlm_lockres_put(res);
+       dlm_put(dlm);
+       return ret;
+}
+
 
 /*
  * DLM_MIGRATE_LOCKRES
        return ret;
 }
 
+#define DLM_MIGRATION_RETRY_MS  100
+
+/* Should be called only after beginning the domain leave process.
+ * There should not be any remaining locks on nonlocal lock resources,
+ * and there should be no local locks left on locally mastered resources.
+ *
+ * Called with the dlm spinlock held, may drop it to do migration, but
+ * will re-acquire before exit.
+ *
+ * Returns: 1 if dlm->spinlock was dropped/retaken, 0 if never dropped */
+int dlm_empty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
+{
+       int ret;
+       int lock_dropped = 0;
+
+       if (res->owner != dlm->node_num) {
+               if (!__dlm_lockres_unused(res)) {
+                       mlog(ML_ERROR, "%s:%.*s: this node is not master, "
+                            "trying to free this but locks remain\n",
+                            dlm->name, res->lockname.len, res->lockname.name);
+               }
+               goto leave;
+       }
+
+       /* Wheee! Migrate lockres here! Will sleep so drop spinlock. */
+       spin_unlock(&dlm->spinlock);
+       lock_dropped = 1;
+       while (1) {
+               ret = dlm_migrate_lockres(dlm, res, O2NM_MAX_NODES);
+               if (ret >= 0)
+                       break;
+               if (ret == -ENOTEMPTY) {
+                       mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
+                               res->lockname.len, res->lockname.name);
+                       BUG();
+               }
+
+               mlog(0, "lockres %.*s: migrate failed, "
+                    "retrying\n", res->lockname.len,
+                    res->lockname.name);
+               msleep(DLM_MIGRATION_RETRY_MS);
+       }
+       spin_lock(&dlm->spinlock);
+leave:
+       return lock_dropped;
+}
+
 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
 {
        int ret;
 {
        struct list_head *iter, *iter2;
        struct list_head *queue = &res->granted;
-       int i;
+       int i, bit;
        struct dlm_lock *lock;
 
        assert_spin_locked(&res->spinlock);
                                BUG_ON(!list_empty(&lock->bast_list));
                                BUG_ON(lock->ast_pending);
                                BUG_ON(lock->bast_pending);
+                               dlm_lockres_clear_refmap_bit(lock->ml.node, res);
                                list_del_init(&lock->list);
                                dlm_lock_put(lock);
                        }
                }
                queue++;
        }
+       bit = 0;
+       while (1) {
+               bit = find_next_bit(res->refmap, O2NM_MAX_NODES, bit);
+               if (bit >= O2NM_MAX_NODES)
+                       break;
+               /* do not clear the local node reference, if there is a
+                * process holding this, let it drop the ref itself */
+               if (bit != dlm->node_num) {
+                       mlog(0, "%s:%.*s: node %u had a ref to this "
+                            "migrating lockres, clearing\n", dlm->name,
+                            res->lockname.len, res->lockname.name, bit);
+                       dlm_lockres_clear_refmap_bit(bit, res);
+               }
+               bit++;
+       }
 }
 
 /* for now this is not too intelligent.  we will
                        mlog(0, "migrate request (node %u) returned %d!\n",
                             nodenum, status);
                        ret = status;
+               } else if (status == DLM_MIGRATE_RESPONSE_MASTERY_REF) {
+                       /* during the migration request we short-circuited
+                        * the mastery of the lockres.  make sure we have
+                        * a mastery ref for nodenum */
+                       mlog(0, "%s:%.*s: need ref for node %u\n",
+                            dlm->name, res->lockname.len, res->lockname.name,
+                            nodenum);
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_set_refmap_bit(nodenum, res);
+                       spin_unlock(&res->spinlock);
                }
        }
 
                        /* remove it from the list so that only one
                         * mle will be found */
                        list_del_init(&tmp->list);
-                       __dlm_mle_detach_hb_events(dlm, mle);
+                       /* this was obviously WRONG.  mle is uninited here.  should be tmp. */
+                       __dlm_mle_detach_hb_events(dlm, tmp);
+                       ret = DLM_MIGRATE_RESPONSE_MASTERY_REF;
+                       mlog(0, "%s:%.*s: master=%u, newmaster=%u, "
+                           "telling master to get ref for cleared out mle "
+                           "during migration\n", dlm->name, namelen, name,
+                           master, new_master);
                }
                spin_unlock(&tmp->spinlock);
        }
        /* now add a migration mle to the tail of the list */
        dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
        mle->new_master = new_master;
+       /* the new master will be sending an assert master for this.
+        * at that point we will get the refmap reference */
        mle->master = master;
        /* do this for consistency with other mle types */
        set_bit(new_master, mle->maybe_map);
        clear_bit(dlm->node_num, iter.node_map);
        spin_unlock(&dlm->spinlock);
 
+       /* ownership of the lockres is changing.  account for the
+        * mastery reference here since old_master will briefly have
+        * a reference after the migration completes */
+       spin_lock(&res->spinlock);
+       dlm_lockres_set_refmap_bit(old_master, res);
+       spin_unlock(&res->spinlock);
+
        mlog(0, "now time to do a migrate request to other nodes\n");
        ret = dlm_do_migrate_request(dlm, res, old_master,
                                     dlm->node_num, &iter);
             res->lockname.len, res->lockname.name);
        /* this call now finishes out the nodemap
         * even if one or more nodes die */
-       ret = dlm_do_assert_master(dlm, res->lockname.name,
-                                  res->lockname.len, iter.node_map,
+       ret = dlm_do_assert_master(dlm, res, iter.node_map,
                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
        if (ret < 0) {
                /* no longer need to retry.  all living nodes contacted. */
        set_bit(old_master, iter.node_map);
        mlog(0, "doing assert master of %.*s back to %u\n",
             res->lockname.len, res->lockname.name, old_master);
-       ret = dlm_do_assert_master(dlm, res->lockname.name,
-                                  res->lockname.len, iter.node_map,
+       ret = dlm_do_assert_master(dlm, res, iter.node_map,
                                   DLM_ASSERT_MASTER_FINISH_MIGRATION);
        if (ret < 0) {
                mlog(0, "assert master to original master failed "
 
        if (total_locks == mres_total_locks)
                mres->flags |= DLM_MRES_ALL_DONE;
 
+       mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+            dlm->name, res->lockname.len, res->lockname.name,
+            orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+            send_to);
+
        /* send it */
        ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
                                 sz, send_to, &status);
        return 0;
 }
 
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+                              struct dlm_migratable_lockres *mres)
+{
+       struct dlm_lock dummy;
+       memset(&dummy, 0, sizeof(dummy));
+       dummy.ml.cookie = 0;
+       dummy.ml.type = LKM_IVMODE;
+       dummy.ml.convert_type = LKM_IVMODE;
+       dummy.ml.highest_blocked = LKM_IVMODE;
+       dummy.lksb = NULL;
+       dummy.ml.node = dlm->node_num;
+       dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+                                   struct dlm_migratable_lock *ml,
+                                   u8 *nodenum)
+{
+       if (unlikely(ml->cookie == 0 &&
+           ml->type == LKM_IVMODE &&
+           ml->convert_type == LKM_IVMODE &&
+           ml->highest_blocked == LKM_IVMODE &&
+           ml->list == DLM_BLOCKED_LIST)) {
+               *nodenum = ml->node;
+               return 1;
+       }
+       return 0;
+}
 
 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
                         struct dlm_migratable_lockres *mres,
                                goto error;
                }
        }
+       if (total_locks == 0) {
+               /* send a dummy lock to indicate a mastery reference only */
+               mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+                    dlm->name, res->lockname.len, res->lockname.name,
+                    send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+                    "migration");
+               dlm_add_dummy_lock(dlm, mres);
+       }
        /* flush any remaining locks */
        ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
        if (ret < 0)
                /* add an extra ref for just-allocated lockres 
                 * otherwise the lockres will be purged immediately */
                dlm_lockres_get(res);
-
        }
 
        /* at this point we have allocated everything we need,
         * and we have a hashed lockres with an extra ref and
         * the proper res->state flags. */
        ret = 0;
+       spin_lock(&res->spinlock);
+       /* drop this either when master requery finds a different master
+        * or when a lock is added by the recovery worker */
+       dlm_lockres_grab_inflight_ref(dlm, res);
        if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
                /* migration cannot have an unknown master */
                BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
                          "unknown owner.. will need to requery: "
                          "%.*s\n", mres->lockname_len, mres->lockname);
        } else {
-               spin_lock(&res->spinlock);
+               /* take a reference now to pin the lockres, drop it
+                * when locks are added in the worker */
                dlm_change_lockres_owner(dlm, res, dlm->node_num);
-               spin_unlock(&res->spinlock);
        }
+       spin_unlock(&res->spinlock);
 
        /* queue up work for dlm_mig_lockres_worker */
        dlm_grab(dlm);  /* get an extra ref for the work item */
                                   "this node will take it.\n",
                                   res->lockname.len, res->lockname.name);
                } else {
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_drop_inflight_ref(dlm, res);
+                       spin_unlock(&res->spinlock);
                        mlog(0, "master needs to respond to sender "
                                  "that node %u still owns %.*s\n",
                                  real_master, res->lockname.len,
        int i, bad;
        struct list_head *iter;
        struct dlm_lock *lock = NULL;
+       u8 from = O2NM_MAX_NODES;
+       unsigned int added = 0;
 
        mlog(0, "running %d locks for this lockres\n", mres->num_locks);
        for (i=0; i<mres->num_locks; i++) {
                ml = &(mres->ml[i]);
+
+               if (dlm_is_dummy_lock(dlm, ml, &from)) {
+                       /* placeholder, just need to set the refmap bit */
+                       BUG_ON(mres->num_locks != 1);
+                       mlog(0, "%s:%.*s: dummy lock for %u\n",
+                            dlm->name, mres->lockname_len, mres->lockname,
+                            from);
+                       spin_lock(&res->spinlock);
+                       dlm_lockres_set_refmap_bit(from, res);
+                       spin_unlock(&res->spinlock);
+                       added++;
+                       break;
+               }
                BUG_ON(ml->highest_blocked != LKM_IVMODE);
                newlock = NULL;
                lksb = NULL;
                        /* do not alter lock refcount.  switching lists. */
                        list_move_tail(&lock->list, queue);
                        spin_unlock(&res->spinlock);
+                       added++;
 
                        mlog(0, "just reordered a local lock!\n");
                        continue;
                if (!bad) {
                        dlm_lock_get(newlock);
                        list_add_tail(&newlock->list, queue);
+                       mlog(0, "%s:%.*s: added lock for node %u, "
+                            "setting refmap bit\n", dlm->name,
+                            res->lockname.len, res->lockname.name, ml->node);
+                       dlm_lockres_set_refmap_bit(ml->node, res);
+                       added++;
                }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");
 
 leave:
+       /* balance the ref taken when the work was queued */
+       if (added > 0) {
+               spin_lock(&res->spinlock);
+               dlm_lockres_drop_inflight_ref(dlm, res);
+               spin_unlock(&res->spinlock);
+       }
+
        if (ret < 0) {
                mlog_errno(ret);
                if (newlock)
                if (res->owner == dead_node) {
                        list_del_init(&res->recovering);
                        spin_lock(&res->spinlock);
+                       /* new_master has our reference from
+                        * the lock state sent during recovery */
                        dlm_change_lockres_owner(dlm, res, new_master);
                        res->state &= ~DLM_LOCK_RES_RECOVERING;
-                       if (!__dlm_lockres_unused(res))
+                       if (__dlm_lockres_has_locks(res))
                                __dlm_dirty_lockres(dlm, res);
                        spin_unlock(&res->spinlock);
                        wake_up(&res->wq);
                                        dlm_lockres_put(res);
                                }
                                spin_lock(&res->spinlock);
+                               /* new_master has our reference from
+                                * the lock state sent during recovery */
                                dlm_change_lockres_owner(dlm, res, new_master);
                                res->state &= ~DLM_LOCK_RES_RECOVERING;
-                               if (!__dlm_lockres_unused(res))
+                               if (__dlm_lockres_has_locks(res))
                                        __dlm_dirty_lockres(dlm, res);
                                spin_unlock(&res->spinlock);
                                wake_up(&res->wq);
 {
        struct list_head *iter, *tmpiter;
        struct dlm_lock *lock;
+       unsigned int freed = 0;
 
        /* this node is the lockres master:
         * 1) remove any stale locks for the dead node
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
        list_for_each_safe(iter, tmpiter, &res->converting) {
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
        list_for_each_safe(iter, tmpiter, &res->blocked) {
                if (lock->ml.node == dead_node) {
                        list_del_init(&lock->list);
                        dlm_lock_put(lock);
+                       freed++;
                }
        }
 
+       if (freed) {
+               mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+                    "dropping ref from lockres\n", dlm->name,
+                    res->lockname.len, res->lockname.name, freed, dead_node);
+               BUG_ON(!test_bit(dead_node, res->refmap));
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       } else if (test_bit(dead_node, res->refmap)) {
+               mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+                    "no locks and had not purged before dying\n", dlm->name,
+                    res->lockname.len, res->lockname.name, dead_node);
+               dlm_lockres_clear_refmap_bit(dead_node, res);
+       }
+
        /* do not kick thread yet */
        __dlm_dirty_lockres(dlm, res);
 }
                        spin_lock(&res->spinlock);
                        /* zero the lvb if necessary */
                        dlm_revalidate_lvb(dlm, res, dead_node);
-                       if (res->owner == dead_node)
+                       if (res->owner == dead_node) {
+                               if (res->state & DLM_LOCK_RES_DROPPING_REF)
+                                       mlog(0, "%s:%.*s: owned by "
+                                            "dead node %u, this node was "
+                                            "dropping its ref when it died. "
+                                            "continue, dropping the flag.\n",
+                                            dlm->name, res->lockname.len,
+                                            res->lockname.name, dead_node);
+
+                               /* the wake_up for this will happen when the
+                                * RECOVERING flag is dropped later */
+                               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
                                dlm_move_lockres_to_recovery_list(dlm, res);
-                       else if (res->owner == dlm->node_num) {
+                       } else if (res->owner == dlm->node_num) {
                                dlm_free_dead_locks(dlm, res, dead_node);
                                __dlm_lockres_calc_usage(dlm, res);
                        }
 
 #include "cluster/masklog.h"
 
 static int dlm_thread(void *data);
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-                                 struct dlm_lock_resource *lockres);
-
 static void dlm_flush_asts(struct dlm_ctxt *dlm);
 
 #define dlm_lock_is_remote(dlm, lock)     ((lock)->ml.node != (dlm)->node_num)
        current->state = TASK_RUNNING;
 }
 
-
-int __dlm_lockres_unused(struct dlm_lock_resource *res)
+int __dlm_lockres_has_locks(struct dlm_lock_resource *res)
 {
        if (list_empty(&res->granted) &&
            list_empty(&res->converting) &&
-           list_empty(&res->blocked) &&
-           list_empty(&res->dirty))
-               return 1;
+           list_empty(&res->blocked))
+               return 0;
+       return 1;
+}
+
+/* "unused": the lockres has no locks, is not on the dirty list,
+ * has no inflight locks (in the gap between mastery and acquiring
+ * the first lock), and has no bits in its refmap.
+ * truly ready to be freed. */
+int __dlm_lockres_unused(struct dlm_lock_resource *res)
+{
+       if (!__dlm_lockres_has_locks(res) &&
+           list_empty(&res->dirty)) {
+               /* try not to scan the bitmap unless the first two
+                * conditions are already true */
+               int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
+               if (bit >= O2NM_MAX_NODES) {
+                       /* since the bit for dlm->node_num is not
+                        * set, inflight_locks better be zero */
+                       BUG_ON(res->inflight_locks != 0);
+                       return 1;
+               }
+       }
        return 0;
 }
 
        assert_spin_locked(&res->spinlock);
 
        if (__dlm_lockres_unused(res)){
-               /* For now, just keep any resource we master */
-               if (res->owner == dlm->node_num)
-               {
-                       if (!list_empty(&res->purge)) {
-                               mlog(0, "we master %s:%.*s, but it is on "
-                                    "the purge list.  Removing\n",
-                                    dlm->name, res->lockname.len,
-                                    res->lockname.name);
-                               list_del_init(&res->purge);
-                               dlm->purge_count--;
-                       }
-                       return;
-               }
-
                if (list_empty(&res->purge)) {
-                       mlog(0, "putting lockres %.*s from purge list\n",
-                            res->lockname.len, res->lockname.name);
+                       mlog(0, "putting lockres %.*s:%p onto purge list\n",
+                            res->lockname.len, res->lockname.name, res);
 
                        res->last_used = jiffies;
+                       dlm_lockres_get(res);
                        list_add_tail(&res->purge, &dlm->purge_list);
                        dlm->purge_count++;
-
-                       /* if this node is not the owner, there is
-                        * no way to keep track of who the owner could be.
-                        * unhash it to avoid serious problems. */
-                       if (res->owner != dlm->node_num) {
-                               mlog(0, "%s:%.*s: doing immediate "
-                                    "purge of lockres owned by %u\n",
-                                    dlm->name, res->lockname.len,
-                                    res->lockname.name, res->owner);
-
-                               dlm_purge_lockres_now(dlm, res);
-                       }
                }
        } else if (!list_empty(&res->purge)) {
-               mlog(0, "removing lockres %.*s from purge list, "
-                    "owner=%u\n", res->lockname.len, res->lockname.name,
-                    res->owner);
+               mlog(0, "removing lockres %.*s:%p from purge list, owner=%u\n",
+                    res->lockname.len, res->lockname.name, res, res->owner);
 
                list_del_init(&res->purge);
+               dlm_lockres_put(res);
                dlm->purge_count--;
        }
 }
        spin_unlock(&dlm->spinlock);
 }
 
-/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
- * to do migration, but will re-acquire before exit. */
-void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
+int dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
 {
        int master;
-       int ret;
-
-       spin_lock(&lockres->spinlock);
-       master = lockres->owner == dlm->node_num;
-       spin_unlock(&lockres->spinlock);
-
-       mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
-            lockres->lockname.name, master);
+       int ret = 0;
 
-       /* Non master is the easy case -- no migration required, just
-        * quit. */
+       spin_lock(&res->spinlock);
+       if (!__dlm_lockres_unused(res)) {
+               spin_unlock(&res->spinlock);
+               mlog(0, "%s:%.*s: tried to purge but not unused\n",
+                    dlm->name, res->lockname.len, res->lockname.name);
+               return -ENOTEMPTY;
+       }
+       master = (res->owner == dlm->node_num);
        if (!master)
-               goto finish;
-
-       /* Wheee! Migrate lockres here! */
-       spin_unlock(&dlm->spinlock);
-again:
+               res->state |= DLM_LOCK_RES_DROPPING_REF;
+       spin_unlock(&res->spinlock);
 
-       ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
-       if (ret == -ENOTEMPTY) {
-               mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
-                    lockres->lockname.len, lockres->lockname.name);
+       mlog(0, "purging lockres %.*s, master = %d\n", res->lockname.len,
+            res->lockname.name, master);
 
-               BUG();
-       } else if (ret < 0) {
-               mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
-                    lockres->lockname.len, lockres->lockname.name);
-               msleep(100);
-               goto again;
+       if (!master) {
+               /* drop spinlock to do messaging, retake below */
+               spin_unlock(&dlm->spinlock);
+               /* clear our bit from the master's refmap, ignore errors */
+               ret = dlm_drop_lockres_ref(dlm, res);
+               if (ret < 0) {
+                       mlog_errno(ret);
+                       if (!dlm_is_host_down(ret))
+                               BUG();
+               }
+               mlog(0, "%s:%.*s: dlm_deref_lockres returned %d\n",
+                    dlm->name, res->lockname.len, res->lockname.name, ret);
+               spin_lock(&dlm->spinlock);
        }
 
-       spin_lock(&dlm->spinlock);
-
-finish:
-       if (!list_empty(&lockres->purge)) {
-               list_del_init(&lockres->purge);
+       if (!list_empty(&res->purge)) {
+               mlog(0, "removing lockres %.*s:%p from purgelist, "
+                    "master = %d\n", res->lockname.len, res->lockname.name,
+                    res, master);
+               list_del_init(&res->purge);
+               dlm_lockres_put(res);
                dlm->purge_count--;
        }
-       __dlm_unhash_lockres(lockres);
-}
-
-/* make an unused lockres go away immediately.
- * as soon as the dlm spinlock is dropped, this lockres
- * will not be found. kfree still happens on last put. */
-static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
-                                 struct dlm_lock_resource *lockres)
-{
-       assert_spin_locked(&dlm->spinlock);
-       assert_spin_locked(&lockres->spinlock);
-
-       BUG_ON(!__dlm_lockres_unused(lockres));
+       __dlm_unhash_lockres(res);
 
-       if (!list_empty(&lockres->purge)) {
-               list_del_init(&lockres->purge);
-               dlm->purge_count--;
+       /* lockres is not in the hash now.  drop the flag and wake up
+        * any processes waiting in dlm_get_lock_resource. */
+       if (!master) {
+               spin_lock(&res->spinlock);
+               res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+               spin_unlock(&res->spinlock);
+               wake_up(&res->wq);
        }
-       __dlm_unhash_lockres(lockres);
+       return 0;
 }
 
 static void dlm_run_purge_list(struct dlm_ctxt *dlm,
                        break;
                }
 
+               mlog(0, "removing lockres %.*s:%p from purgelist\n",
+                    lockres->lockname.len, lockres->lockname.name, lockres);
                list_del_init(&lockres->purge);
+               dlm_lockres_put(lockres);
                dlm->purge_count--;
 
                /* This may drop and reacquire the dlm spinlock if it
                 * has to do migration. */
                mlog(0, "calling dlm_purge_lockres!\n");
-               dlm_purge_lockres(dlm, lockres);
+               if (dlm_purge_lockres(dlm, lockres))
+                       BUG();
                mlog(0, "DONE calling dlm_purge_lockres!\n");
 
                /* Avoid adding any scheduling latencies */