Opt_read_only,
        Opt_read_write,
        Opt_lock_on_read,
+       Opt_exclusive,
        Opt_err
 };
 
        {Opt_read_write, "read_write"},
        {Opt_read_write, "rw"},         /* Alternate spelling */
        {Opt_lock_on_read, "lock_on_read"},
+       {Opt_exclusive, "exclusive"},
        {Opt_err, NULL}
 };
 
        int     queue_depth;
        bool    read_only;
        bool    lock_on_read;
+       bool    exclusive;
 };
 
 #define RBD_QUEUE_DEPTH_DEFAULT        BLKDEV_MAX_RQ
 #define RBD_READ_ONLY_DEFAULT  false
 #define RBD_LOCK_ON_READ_DEFAULT false
+#define RBD_EXCLUSIVE_DEFAULT  false
 
 static int parse_rbd_opts_token(char *c, void *private)
 {
        case Opt_lock_on_read:
                rbd_opts->lock_on_read = true;
                break;
+       case Opt_exclusive:
+               rbd_opts->exclusive = true;
+               break;
        default:
                /* libceph prints "bad option" msg */
                return -EINVAL;
        ret = rbd_request_lock(rbd_dev);
        if (ret == -ETIMEDOUT) {
                goto again; /* treat this as a dead client */
+       } else if (ret == -EROFS) {
+               rbd_warn(rbd_dev, "peer will not release lock");
+               /*
+                * If this is rbd_add_acquire_lock(), we want to fail
+                * immediately -- reuse BLACKLISTED flag.  Otherwise we
+                * want to block.
+                */
+               if (!(rbd_dev->disk->flags & GENHD_FL_UP)) {
+                       set_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags);
+                       /* wake "rbd map --exclusive" process */
+                       wake_requests(rbd_dev, false);
+               }
        } else if (ret < 0) {
                rbd_warn(rbd_dev, "error requesting lock: %d", ret);
                mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
                result = 0;
 
                if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
-                       dout("%s rbd_dev %p queueing unlock_work\n", __func__,
-                            rbd_dev);
-                       queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+                       if (!rbd_dev->opts->exclusive) {
+                               dout("%s rbd_dev %p queueing unlock_work\n",
+                                    __func__, rbd_dev);
+                               queue_work(rbd_dev->task_wq,
+                                          &rbd_dev->unlock_work);
+                       } else {
+                               /* refuse to release the lock */
+                               result = -EROFS;
+                       }
                }
        }
 
        if (must_be_locked) {
                down_read(&rbd_dev->lock_rwsem);
                if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED &&
-                   !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags))
+                   !test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+                       if (rbd_dev->opts->exclusive) {
+                               rbd_warn(rbd_dev, "exclusive lock required");
+                               result = -EROFS;
+                               goto err_unlock;
+                       }
                        rbd_wait_state_locked(rbd_dev);
+               }
                if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
                        result = -EBLACKLISTED;
                        goto err_unlock;
        rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
        rbd_opts->queue_depth = RBD_QUEUE_DEPTH_DEFAULT;
        rbd_opts->lock_on_read = RBD_LOCK_ON_READ_DEFAULT;
+       rbd_opts->exclusive = RBD_EXCLUSIVE_DEFAULT;
 
        copts = ceph_parse_options(options, mon_addrs,
                                        mon_addrs + mon_addrs_size - 1,
        return ret;
 }
 
+static void rbd_dev_image_unlock(struct rbd_device *rbd_dev)
+{
+       down_write(&rbd_dev->lock_rwsem);
+       if (__rbd_is_lock_owner(rbd_dev))
+               rbd_unlock(rbd_dev);
+       up_write(&rbd_dev->lock_rwsem);
+}
+
+static int rbd_add_acquire_lock(struct rbd_device *rbd_dev)
+{
+       if (!(rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK)) {
+               rbd_warn(rbd_dev, "exclusive-lock feature is not enabled");
+               return -EINVAL;
+       }
+
+       /* FIXME: "rbd map --exclusive" should be in interruptible */
+       down_read(&rbd_dev->lock_rwsem);
+       rbd_wait_state_locked(rbd_dev);
+       up_read(&rbd_dev->lock_rwsem);
+       if (test_bit(RBD_DEV_FLAG_BLACKLISTED, &rbd_dev->flags)) {
+               rbd_warn(rbd_dev, "failed to acquire exclusive lock");
+               return -EROFS;
+       }
+
+       return 0;
+}
+
 /*
  * An rbd format 2 image has a unique identifier, distinct from the
  * name given to it by the user.  Internally, that identifier is
        if (rc)
                goto err_out_image_probe;
 
+       if (rbd_dev->opts->exclusive) {
+               rc = rbd_add_acquire_lock(rbd_dev);
+               if (rc)
+                       goto err_out_device_setup;
+       }
+
        /* Everything's ready.  Announce the disk to the world. */
 
        rc = device_add(&rbd_dev->dev);
        if (rc)
-               goto err_out_device_setup;
+               goto err_out_image_lock;
 
        add_disk(rbd_dev->disk);
        /* see rbd_init_disk() */
        module_put(THIS_MODULE);
        return rc;
 
+err_out_image_lock:
+       rbd_dev_image_unlock(rbd_dev);
 err_out_device_setup:
        rbd_dev_device_release(rbd_dev);
 err_out_image_probe:
        spin_unlock(&rbd_dev_list_lock);
        device_del(&rbd_dev->dev);
 
-       down_write(&rbd_dev->lock_rwsem);
-       if (__rbd_is_lock_owner(rbd_dev))
-               rbd_unlock(rbd_dev);
-       up_write(&rbd_dev->lock_rwsem);
-
+       rbd_dev_image_unlock(rbd_dev);
        rbd_dev_device_release(rbd_dev);
        rbd_dev_image_release(rbd_dev);
        rbd_dev_destroy(rbd_dev);