A general purpose distributed lock manager for kernel or userspace
        applications.
 
-config DLM_DEVICE
-       tristate "DLM device for userspace access"
-       depends on DLM
-       help
-       This module creates a misc device through which the dlm lockspace
-       and locking functions become available to userspace applications
-       (usually through the libdlm library).
-
 config DLM_DEBUG
        bool "DLM debugging"
        depends on DLM
 
 obj-$(CONFIG_DLM) +=           dlm.o
-obj-$(CONFIG_DLM_DEVICE) +=    dlm_device.o
-
 dlm-y :=                       ast.o \
                                config.o \
                                dir.o \
                                recover.o \
                                recoverd.o \
                                requestqueue.o \
+                               user.o \
                                util.o
 dlm-$(CONFIG_DLM_DEBUG) +=     debug_fs.o
 
-dlm_device-y :=                        device.o
 
 
 #include "dlm_internal.h"
 #include "lock.h"
-#include "ast.h"
+#include "user.h"
 
 #define WAKE_ASTS  0
 
 
 void dlm_add_ast(struct dlm_lkb *lkb, int type)
 {
+       if (lkb->lkb_flags & DLM_IFL_USER) {
+               dlm_user_add_ast(lkb, type);
+               return;
+       }
+
        spin_lock(&ast_queue_lock);
        if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
                kref_get(&lkb->lkb_ref);
 
+++ /dev/null
-/******************************************************************************
-*******************************************************************************
-**
-**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
-**
-**  This copyrighted material is made available to anyone wishing to use,
-**  modify, copy, or redistribute it subject to the terms and conditions
-**  of the GNU General Public License v.2.
-**
-*******************************************************************************
-******************************************************************************/
-
-/*
- * device.c
- *
- * This is the userland interface to the DLM.
- *
- * The locking is done via a misc char device (find the
- * registered minor number in /proc/misc).
- *
- * User code should not use this interface directly but
- * call the library routines in libdlm.a instead.
- *
- */
-
-#include <linux/miscdevice.h>
-#include <linux/init.h>
-#include <linux/wait.h>
-#include <linux/module.h>
-#include <linux/file.h>
-#include <linux/fs.h>
-#include <linux/poll.h>
-#include <linux/signal.h>
-#include <linux/spinlock.h>
-#include <linux/idr.h>
-
-#include <linux/dlm.h>
-#include <linux/dlm_device.h>
-
-#include "lvb_table.h"
-
-static struct file_operations _dlm_fops;
-static const char *name_prefix="dlm";
-static struct list_head user_ls_list;
-static struct mutex user_ls_lock;
-
-/* Flags in li_flags */
-#define LI_FLAG_COMPLETE   1
-#define LI_FLAG_FIRSTLOCK  2
-#define LI_FLAG_PERSISTENT 3
-#define LI_FLAG_ONLIST     4
-
-/* flags in ls_flags*/
-#define LS_FLAG_DELETED   1
-#define LS_FLAG_AUTOFREE  2
-
-/* flags in ls_flags*/
-#define FI_FLAG_OPEN      1
-#define FI_FLAG_COMPAT    2
-
-#define LOCKINFO_MAGIC 0x53595324
-
-struct lock_info {
-       uint32_t li_magic;
-       uint8_t li_cmd;
-       int8_t  li_grmode;
-       int8_t  li_rqmode;
-       struct dlm_lksb li_lksb;
-       wait_queue_head_t li_waitq;
-       unsigned long li_flags;
-       void __user *li_castparam;
-       void __user *li_castaddr;
-       void __user *li_bastparam;
-       void __user *li_bastaddr;
-       void __user *li_pend_bastparam;
-       void __user *li_pend_bastaddr;
-       struct list_head li_ownerqueue;
-       struct file_info *li_file;
-       struct dlm_lksb __user *li_user_lksb;
-       struct completion li_firstcomp;
-};
-
-/* A queued AST no less */
-struct ast_info {
-       struct dlm_lock_result result;
-       struct list_head list;
-       uint32_t lvb_updated;
-       uint32_t progress;      /* How much has been read */
-};
-
-/* One of these per userland lockspace */
-struct user_ls {
-       void    *ls_lockspace;
-       atomic_t ls_refcnt;
-       long     ls_flags;
-
-       /* Lock infos are stored in here indexed by lock ID */
-       struct idr lockinfo_idr;
-       rwlock_t lockinfo_lock;
-
-       /* Passed into misc_register() */
-       struct miscdevice ls_miscinfo;
-       struct list_head  ls_list;
-};
-
-/* misc_device info for the control device */
-static struct miscdevice ctl_device;
-
-/*
- * Stuff we hang off the file struct.
- * The first two are to cope with unlocking all the
- * locks help by a process when it dies.
- */
-struct file_info {
-       struct list_head    fi_li_list;  /* List of active lock_infos */
-       spinlock_t          fi_li_lock;
-       struct list_head    fi_ast_list; /* Queue of ASTs to be delivered */
-       spinlock_t          fi_ast_lock;
-       wait_queue_head_t   fi_wait;
-       struct user_ls     *fi_ls;
-       atomic_t            fi_refcnt;   /* Number of users */
-       unsigned long       fi_flags;
-};
-
-#ifdef CONFIG_COMPAT
-
-struct dlm_lock_params32 {
-       __u8 mode;
-       __u8 namelen;
-       __u16 flags;
-       __u32 lkid;
-       __u32 parent;
-
-       __u32 castparam;
-       __u32 castaddr;
-       __u32 bastparam;
-       __u32 bastaddr;
-       __u32 lksb;
-
-       char lvb[DLM_USER_LVB_LEN];
-       char name[0];
-};
-
-struct dlm_write_request32 {
-       __u32 version[3];
-       __u8 cmd;
-       __u8 is64bit;
-       __u8 unused[2];
-
-       union  {
-               struct dlm_lock_params32 lock;
-               struct dlm_lspace_params lspace;
-       } i;
-};
-
-struct dlm_lksb32 {
-       __u32    sb_status;
-       __u32    sb_lkid;
-       __u8     sb_flags;
-       __u32    sb_lvbptr;
-};
-
-struct dlm_lock_result32 {
-       __u32 length;
-       __u32 user_astaddr;
-       __u32 user_astparam;
-       __u32 user_lksb;
-       struct dlm_lksb32 lksb;
-       __u8 bast_mode;
-       __u8 unused[3];
-       /* Offsets may be zero if no data is present */
-       __u32 lvb_offset;
-};
-
-
-static void compat_input(struct dlm_write_request *kparams, struct dlm_write_request32 *k32params)
-{
-
-       kparams->version[0] = k32params->version[0];
-       kparams->version[1] = k32params->version[1];
-       kparams->version[2] = k32params->version[2];
-
-       kparams->cmd = k32params->cmd;
-       kparams->is64bit = k32params->is64bit;
-       if (kparams->cmd == DLM_USER_CREATE_LOCKSPACE ||
-           kparams->cmd == DLM_USER_REMOVE_LOCKSPACE) {
-
-               kparams->i.lspace.flags = k32params->i.lspace.flags;
-               kparams->i.lspace.minor = k32params->i.lspace.minor;
-               strcpy(kparams->i.lspace.name, k32params->i.lspace.name);
-       }
-       else {
-               kparams->i.lock.mode = k32params->i.lock.mode;
-               kparams->i.lock.namelen = k32params->i.lock.namelen;
-               kparams->i.lock.flags = k32params->i.lock.flags;
-               kparams->i.lock.lkid = k32params->i.lock.lkid;
-               kparams->i.lock.parent = k32params->i.lock.parent;
-               kparams->i.lock.castparam = (void *)(long)k32params->i.lock.castparam;
-               kparams->i.lock.castaddr = (void *)(long)k32params->i.lock.castaddr;
-               kparams->i.lock.bastparam = (void *)(long)k32params->i.lock.bastparam;
-               kparams->i.lock.bastaddr = (void *)(long)k32params->i.lock.bastaddr;
-               kparams->i.lock.lksb = (void *)(long)k32params->i.lock.lksb;
-               memcpy(kparams->i.lock.lvb, k32params->i.lock.lvb, DLM_USER_LVB_LEN);
-               memcpy(kparams->i.lock.name, k32params->i.lock.name, kparams->i.lock.namelen);
-       }
-}
-
-void compat_output(struct dlm_lock_result *res, struct dlm_lock_result32 *res32)
-{
-       res32->length = res->length - (sizeof(struct dlm_lock_result) - sizeof(struct dlm_lock_result32));
-       res32->user_astaddr = (__u32)(long)res->user_astaddr;
-       res32->user_astparam = (__u32)(long)res->user_astparam;
-       res32->user_lksb = (__u32)(long)res->user_lksb;
-       res32->bast_mode = res->bast_mode;
-
-       res32->lvb_offset = res->lvb_offset;
-       res32->length = res->length;
-
-       res32->lksb.sb_status = res->lksb.sb_status;
-       res32->lksb.sb_flags = res->lksb.sb_flags;
-       res32->lksb.sb_lkid = res->lksb.sb_lkid;
-       res32->lksb.sb_lvbptr = (__u32)(long)res->lksb.sb_lvbptr;
-}
-#endif
-
-
-/* get and put ops for file_info.
-   Actually I don't really like "get" and "put", but everyone
-   else seems to use them and I can't think of anything
-   nicer at the moment */
-static void get_file_info(struct file_info *f)
-{
-       atomic_inc(&f->fi_refcnt);
-}
-
-static void put_file_info(struct file_info *f)
-{
-       if (atomic_dec_and_test(&f->fi_refcnt))
-               kfree(f);
-}
-
-static void release_lockinfo(struct user_ls *ls, struct lock_info *li)
-{
-       put_file_info(li->li_file);
-
-       write_lock(&ls->lockinfo_lock);
-       idr_remove(&ls->lockinfo_idr, li->li_lksb.sb_lkid);
-       write_unlock(&ls->lockinfo_lock);
-
-       if (li->li_lksb.sb_lvbptr)
-               kfree(li->li_lksb.sb_lvbptr);
-       kfree(li);
-
-       module_put(THIS_MODULE);
-}
-
-static struct lock_info *get_lockinfo(struct user_ls *ls, uint32_t lockid)
-{
-       struct lock_info *li;
-
-       read_lock(&ls->lockinfo_lock);
-       li = idr_find(&ls->lockinfo_idr, lockid);
-       read_unlock(&ls->lockinfo_lock);
-
-       return li;
-}
-
-static int add_lockinfo(struct user_ls *ls, struct lock_info *li)
-{
-       int n;
-       int r;
-       int ret = -EINVAL;
-
-       write_lock(&ls->lockinfo_lock);
-
-       if (idr_find(&ls->lockinfo_idr, li->li_lksb.sb_lkid))
-               goto out_up;
-
-       ret = -ENOMEM;
-       r = idr_pre_get(&ls->lockinfo_idr, GFP_KERNEL);
-       if (!r)
-               goto out_up;
-
-       r = idr_get_new_above(&ls->lockinfo_idr, li, li->li_lksb.sb_lkid, &n);
-       if (r)
-               goto out_up;
-
-       if (n != li->li_lksb.sb_lkid) {
-               idr_remove(&ls->lockinfo_idr, n);
-               goto out_up;
-       }
-
-       ret = 0;
-
- out_up:
-       write_unlock(&ls->lockinfo_lock);
-
-       return ret;
-}
-
-
-static struct user_ls *__find_lockspace(int minor)
-{
-       struct user_ls *lsinfo;
-
-       list_for_each_entry(lsinfo, &user_ls_list, ls_list) {
-               if (lsinfo->ls_miscinfo.minor == minor)
-                       return lsinfo;
-       }
-       return NULL;
-}
-
-/* Find a lockspace struct given the device minor number */
-static struct user_ls *find_lockspace(int minor)
-{
-       struct user_ls *lsinfo;
-
-       mutex_lock(&user_ls_lock);
-       lsinfo = __find_lockspace(minor);
-       mutex_unlock(&user_ls_lock);
-
-       return lsinfo;
-}
-
-static void add_lockspace_to_list(struct user_ls *lsinfo)
-{
-       mutex_lock(&user_ls_lock);
-       list_add(&lsinfo->ls_list, &user_ls_list);
-       mutex_unlock(&user_ls_lock);
-}
-
-/* Register a lockspace with the DLM and create a misc
-   device for userland to access it */
-static int register_lockspace(char *name, struct user_ls **ls, int flags)
-{
-       struct user_ls *newls;
-       int status;
-       int namelen;
-
-       namelen = strlen(name)+strlen(name_prefix)+2;
-
-       newls = kzalloc(sizeof(struct user_ls), GFP_KERNEL);
-       if (!newls)
-               return -ENOMEM;
-
-       newls->ls_miscinfo.name = kzalloc(namelen, GFP_KERNEL);
-       if (!newls->ls_miscinfo.name) {
-               kfree(newls);
-               return -ENOMEM;
-       }
-
-       status = dlm_new_lockspace(name, strlen(name), &newls->ls_lockspace, 0,
-                                  DLM_USER_LVB_LEN);
-       if (status != 0) {
-               kfree(newls->ls_miscinfo.name);
-               kfree(newls);
-               return status;
-       }
-
-       idr_init(&newls->lockinfo_idr);
-       rwlock_init(&newls->lockinfo_lock);
-
-       snprintf((char*)newls->ls_miscinfo.name, namelen, "%s_%s",
-                name_prefix, name);
-
-       newls->ls_miscinfo.fops = &_dlm_fops;
-       newls->ls_miscinfo.minor = MISC_DYNAMIC_MINOR;
-
-       status = misc_register(&newls->ls_miscinfo);
-       if (status) {
-               printk(KERN_ERR "dlm: misc register failed for %s\n", name);
-               dlm_release_lockspace(newls->ls_lockspace, 0);
-               kfree(newls->ls_miscinfo.name);
-               kfree(newls);
-               return status;
-       }
-
-       if (flags & DLM_USER_LSFLG_AUTOFREE)
-               set_bit(LS_FLAG_AUTOFREE, &newls->ls_flags);
-
-       add_lockspace_to_list(newls);
-       *ls = newls;
-       return 0;
-}
-
-/* Called with the user_ls_lock mutex held */
-static int unregister_lockspace(struct user_ls *lsinfo, int force)
-{
-       int status;
-
-       status = dlm_release_lockspace(lsinfo->ls_lockspace, force);
-       if (status)
-               return status;
-
-       status = misc_deregister(&lsinfo->ls_miscinfo);
-       if (status)
-               return status;
-
-       list_del(&lsinfo->ls_list);
-       set_bit(LS_FLAG_DELETED, &lsinfo->ls_flags);
-       lsinfo->ls_lockspace = NULL;
-       if (atomic_read(&lsinfo->ls_refcnt) == 0) {
-               kfree(lsinfo->ls_miscinfo.name);
-               kfree(lsinfo);
-       }
-
-       return 0;
-}
-
-/* Add it to userland's AST queue */
-static void add_to_astqueue(struct lock_info *li, void *astaddr, void *astparam,
-                           int lvb_updated)
-{
-       struct ast_info *ast = kzalloc(sizeof(struct ast_info), GFP_KERNEL);
-       if (!ast)
-               return;
-
-       ast->result.user_astparam = astparam;
-       ast->result.user_astaddr  = astaddr;
-       ast->result.user_lksb     = li->li_user_lksb;
-       memcpy(&ast->result.lksb, &li->li_lksb, sizeof(struct dlm_lksb));
-       ast->lvb_updated = lvb_updated;
-
-       spin_lock(&li->li_file->fi_ast_lock);
-       list_add_tail(&ast->list, &li->li_file->fi_ast_list);
-       spin_unlock(&li->li_file->fi_ast_lock);
-       wake_up_interruptible(&li->li_file->fi_wait);
-}
-
-static void bast_routine(void *param, int mode)
-{
-       struct lock_info *li = param;
-
-       if (li && li->li_bastaddr)
-               add_to_astqueue(li, li->li_bastaddr, li->li_bastparam, 0);
-}
-
-/*
- * This is the kernel's AST routine.
- * All lock, unlock & query operations complete here.
- * The only syncronous ops are those done during device close.
- */
-static void ast_routine(void *param)
-{
-       struct lock_info *li = param;
-
-       /* Param may be NULL if a persistent lock is unlocked by someone else */
-       if (!li)
-               return;
-
-       /* If this is a succesful conversion then activate the blocking ast
-        * args from the conversion request */
-       if (!test_bit(LI_FLAG_FIRSTLOCK, &li->li_flags) &&
-           li->li_lksb.sb_status == 0) {
-
-               li->li_bastparam = li->li_pend_bastparam;
-               li->li_bastaddr = li->li_pend_bastaddr;
-               li->li_pend_bastaddr = NULL;
-       }
-
-       /* If it's an async request then post data to the user's AST queue. */
-       if (li->li_castaddr) {
-               int lvb_updated = 0;
-
-               /* See if the lvb has been updated */
-               if (dlm_lvb_operations[li->li_grmode+1][li->li_rqmode+1] == 1)
-                       lvb_updated = 1;
-
-               if (li->li_lksb.sb_status == 0)
-                       li->li_grmode = li->li_rqmode;
-
-               /* Only queue AST if the device is still open */
-               if (test_bit(FI_FLAG_OPEN, &li->li_file->fi_flags))
-                       add_to_astqueue(li, li->li_castaddr, li->li_castparam,
-                                       lvb_updated);
-
-               /* If it's a new lock operation that failed, then
-                * remove it from the owner queue and free the
-                * lock_info.
-                */
-               if (test_and_clear_bit(LI_FLAG_FIRSTLOCK, &li->li_flags) &&
-                   li->li_lksb.sb_status != 0) {
-
-                       /* Wait till dlm_lock() has finished */
-                       wait_for_completion(&li->li_firstcomp);
-
-                       spin_lock(&li->li_file->fi_li_lock);
-                       list_del(&li->li_ownerqueue);
-                       clear_bit(LI_FLAG_ONLIST, &li->li_flags);
-                       spin_unlock(&li->li_file->fi_li_lock);
-                       release_lockinfo(li->li_file->fi_ls, li);
-                       return;
-               }
-               /* Free unlocks & queries */
-               if (li->li_lksb.sb_status == -DLM_EUNLOCK ||
-                   li->li_cmd == DLM_USER_QUERY) {
-                       release_lockinfo(li->li_file->fi_ls, li);
-               }
-       } else {
-               /* Synchronous request, just wake up the caller */
-               set_bit(LI_FLAG_COMPLETE, &li->li_flags);
-               wake_up_interruptible(&li->li_waitq);
-       }
-}
-
-/*
- * Wait for the lock op to complete and return the status.
- */
-static int wait_for_ast(struct lock_info *li)
-{
-       /* Wait for the AST routine to complete */
-       set_task_state(current, TASK_INTERRUPTIBLE);
-       while (!test_bit(LI_FLAG_COMPLETE, &li->li_flags))
-               schedule();
-
-       set_task_state(current, TASK_RUNNING);
-
-       return li->li_lksb.sb_status;
-}
-
-
-/* Open on control device */
-static int dlm_ctl_open(struct inode *inode, struct file *file)
-{
-       file->private_data = NULL;
-       return 0;
-}
-
-/* Close on control device */
-static int dlm_ctl_close(struct inode *inode, struct file *file)
-{
-       return 0;
-}
-
-/* Open on lockspace device */
-static int dlm_open(struct inode *inode, struct file *file)
-{
-       struct file_info *f;
-       struct user_ls *lsinfo;
-
-       lsinfo = find_lockspace(iminor(inode));
-       if (!lsinfo)
-               return -ENOENT;
-
-       f = kzalloc(sizeof(struct file_info), GFP_KERNEL);
-       if (!f)
-               return -ENOMEM;
-
-       atomic_inc(&lsinfo->ls_refcnt);
-       INIT_LIST_HEAD(&f->fi_li_list);
-       INIT_LIST_HEAD(&f->fi_ast_list);
-       spin_lock_init(&f->fi_li_lock);
-       spin_lock_init(&f->fi_ast_lock);
-       init_waitqueue_head(&f->fi_wait);
-       f->fi_ls = lsinfo;
-       f->fi_flags = 0;
-       get_file_info(f);
-       set_bit(FI_FLAG_OPEN, &f->fi_flags);
-
-       file->private_data = f;
-
-       return 0;
-}
-
-/* Check the user's version matches ours */
-static int check_version(struct dlm_write_request *req)
-{
-       if (req->version[0] != DLM_DEVICE_VERSION_MAJOR ||
-           (req->version[0] == DLM_DEVICE_VERSION_MAJOR &&
-            req->version[1] > DLM_DEVICE_VERSION_MINOR)) {
-
-               printk(KERN_DEBUG "dlm: process %s (%d) version mismatch "
-                      "user (%d.%d.%d) kernel (%d.%d.%d)\n",
-                      current->comm,
-                      current->pid,
-                      req->version[0],
-                      req->version[1],
-                      req->version[2],
-                      DLM_DEVICE_VERSION_MAJOR,
-                      DLM_DEVICE_VERSION_MINOR,
-                      DLM_DEVICE_VERSION_PATCH);
-               return -EINVAL;
-       }
-       return 0;
-}
-
-/* Close on lockspace device */
-static int dlm_close(struct inode *inode, struct file *file)
-{
-       struct file_info *f = file->private_data;
-       struct lock_info li;
-       struct lock_info *old_li, *safe;
-       sigset_t tmpsig;
-       sigset_t allsigs;
-       struct user_ls *lsinfo;
-       DECLARE_WAITQUEUE(wq, current);
-
-       lsinfo = find_lockspace(iminor(inode));
-       if (!lsinfo)
-               return -ENOENT;
-
-       /* Mark this closed so that ASTs will not be delivered any more */
-       clear_bit(FI_FLAG_OPEN, &f->fi_flags);
-
-       /* Block signals while we are doing this */
-       sigfillset(&allsigs);
-       sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
-
-       /* We use our own lock_info struct here, so that any
-        * outstanding "real" ASTs will be delivered with the
-        * corresponding "real" params, thus freeing the lock_info
-        * that belongs the lock. This catches the corner case where
-        * a lock is BUSY when we try to unlock it here
-        */
-       memset(&li, 0, sizeof(li));
-       clear_bit(LI_FLAG_COMPLETE, &li.li_flags);
-       init_waitqueue_head(&li.li_waitq);
-       add_wait_queue(&li.li_waitq, &wq);
-
-       /*
-        * Free any outstanding locks, they are on the
-        * list in LIFO order so there should be no problems
-        * about unlocking parents before children.
-        */
-       list_for_each_entry_safe(old_li, safe, &f->fi_li_list, li_ownerqueue) {
-               int status;
-               int flags = 0;
-
-               /* Don't unlock persistent locks, just mark them orphaned */
-               if (test_bit(LI_FLAG_PERSISTENT, &old_li->li_flags)) {
-                       list_del(&old_li->li_ownerqueue);
-
-                       /* Update master copy */
-                       /* TODO: Check locking core updates the local and
-                          remote ORPHAN flags */
-                       li.li_lksb.sb_lkid = old_li->li_lksb.sb_lkid;
-                       status = dlm_lock(f->fi_ls->ls_lockspace,
-                                         old_li->li_grmode, &li.li_lksb,
-                                         DLM_LKF_CONVERT|DLM_LKF_ORPHAN,
-                                         NULL, 0, 0, ast_routine, NULL, NULL);
-                       if (status != 0)
-                               printk("dlm: Error orphaning lock %x: %d\n",
-                                      old_li->li_lksb.sb_lkid, status);
-
-                       /* But tidy our references in it */
-                       release_lockinfo(old_li->li_file->fi_ls, old_li);
-                       continue;
-               }
-
-               clear_bit(LI_FLAG_COMPLETE, &li.li_flags);
-
-               flags = DLM_LKF_FORCEUNLOCK;
-               if (old_li->li_grmode >= DLM_LOCK_PW)
-                       flags |= DLM_LKF_IVVALBLK;
-
-               status = dlm_unlock(f->fi_ls->ls_lockspace,
-                                   old_li->li_lksb.sb_lkid, flags,
-                                   &li.li_lksb, &li);
-
-               /* Must wait for it to complete as the next lock could be its
-                * parent */
-               if (status == 0)
-                       wait_for_ast(&li);
-
-               /* Unlock suceeded, free the lock_info struct. */
-               if (status == 0)
-                       release_lockinfo(old_li->li_file->fi_ls, old_li);
-       }
-
-       remove_wait_queue(&li.li_waitq, &wq);
-
-       /*
-        * If this is the last reference to the lockspace
-        * then free the struct. If it's an AUTOFREE lockspace
-        * then free the whole thing.
-        */
-       mutex_lock(&user_ls_lock);
-       if (atomic_dec_and_test(&lsinfo->ls_refcnt)) {
-
-               if (lsinfo->ls_lockspace) {
-                       if (test_bit(LS_FLAG_AUTOFREE, &lsinfo->ls_flags)) {
-                               unregister_lockspace(lsinfo, 1);
-                       }
-               } else {
-                       kfree(lsinfo->ls_miscinfo.name);
-                       kfree(lsinfo);
-               }
-       }
-       mutex_unlock(&user_ls_lock);
-       put_file_info(f);
-
-       /* Restore signals */
-       sigprocmask(SIG_SETMASK, &tmpsig, NULL);
-       recalc_sigpending();
-
-       return 0;
-}
-
-static int do_user_create_lockspace(struct file_info *fi, uint8_t cmd,
-                                   struct dlm_lspace_params *kparams)
-{
-       int status;
-       struct user_ls *lsinfo;
-
-       if (!capable(CAP_SYS_ADMIN))
-               return -EPERM;
-
-       status = register_lockspace(kparams->name, &lsinfo, kparams->flags);
-
-       /* If it succeeded then return the minor number */
-       if (status == 0)
-               status = lsinfo->ls_miscinfo.minor;
-
-       return status;
-}
-
-static int do_user_remove_lockspace(struct file_info *fi, uint8_t cmd,
-                                   struct dlm_lspace_params *kparams)
-{
-       int status;
-       int force = 1;
-       struct user_ls *lsinfo;
-
-       if (!capable(CAP_SYS_ADMIN))
-               return -EPERM;
-
-       mutex_lock(&user_ls_lock);
-       lsinfo = __find_lockspace(kparams->minor);
-       if (!lsinfo) {
-               mutex_unlock(&user_ls_lock);
-               return -EINVAL;
-       }
-
-       if (kparams->flags & DLM_USER_LSFLG_FORCEFREE)
-               force = 3;
-
-       status = unregister_lockspace(lsinfo, force);
-       mutex_unlock(&user_ls_lock);
-
-       return status;
-}
-
-/* Read call, might block if no ASTs are waiting.
- * It will only ever return one message at a time, regardless
- * of how many are pending.
- */
-static ssize_t dlm_read(struct file *file, char __user *buffer, size_t count,
-                       loff_t *ppos)
-{
-       struct file_info *fi = file->private_data;
-       struct ast_info *ast;
-       void *data;
-       int data_size;
-       int struct_size;
-       int offset;
-       DECLARE_WAITQUEUE(wait, current);
-#ifdef CONFIG_COMPAT
-       struct dlm_lock_result32 result32;
-
-       if (count < sizeof(struct dlm_lock_result32))
-#else
-       if (count < sizeof(struct dlm_lock_result))
-#endif
-               return -EINVAL;
-
-       spin_lock(&fi->fi_ast_lock);
-       if (list_empty(&fi->fi_ast_list)) {
-
-               /* No waiting ASTs.
-                * Return EOF if the lockspace been deleted.
-                */
-               if (test_bit(LS_FLAG_DELETED, &fi->fi_ls->ls_flags))
-                       return 0;
-
-               if (file->f_flags & O_NONBLOCK) {
-                       spin_unlock(&fi->fi_ast_lock);
-                       return -EAGAIN;
-               }
-
-               add_wait_queue(&fi->fi_wait, &wait);
-
-       repeat:
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (list_empty(&fi->fi_ast_list) &&
-                   !signal_pending(current)) {
-
-                       spin_unlock(&fi->fi_ast_lock);
-                       schedule();
-                       spin_lock(&fi->fi_ast_lock);
-                       goto repeat;
-               }
-
-               current->state = TASK_RUNNING;
-               remove_wait_queue(&fi->fi_wait, &wait);
-
-               if (signal_pending(current)) {
-                       spin_unlock(&fi->fi_ast_lock);
-                       return -ERESTARTSYS;
-               }
-       }
-
-       ast = list_entry(fi->fi_ast_list.next, struct ast_info, list);
-       list_del(&ast->list);
-       spin_unlock(&fi->fi_ast_lock);
-
-       /* Work out the size of the returned data */
-#ifdef CONFIG_COMPAT
-       if (test_bit(FI_FLAG_COMPAT, &fi->fi_flags)) {
-               data_size = struct_size = sizeof(struct dlm_lock_result32);
-               data = &result32;
-       }
-       else
-#endif
-       {
-               data_size = struct_size = sizeof(struct dlm_lock_result);
-               data = &ast->result;
-       }
-       if (ast->lvb_updated && ast->result.lksb.sb_lvbptr)
-               data_size += DLM_USER_LVB_LEN;
-
-       offset = struct_size;
-
-       /* Room for the extended data ? */
-       if (count >= data_size) {
-
-               if (ast->lvb_updated && ast->result.lksb.sb_lvbptr) {
-                       if (copy_to_user(buffer+offset,
-                                        ast->result.lksb.sb_lvbptr,
-                                        DLM_USER_LVB_LEN))
-                               return -EFAULT;
-                       ast->result.lvb_offset = offset;
-                       offset += DLM_USER_LVB_LEN;
-               }
-       }
-
-       ast->result.length = data_size;
-
-#ifdef CONFIG_COMPAT
-       compat_output(&ast->result, &result32);
-#endif
-
-       /* Copy the header now it has all the offsets in it */
-       if (copy_to_user(buffer, data, struct_size))
-               offset = -EFAULT;
-
-       /* If we only returned a header and there's more to come then put it
-          back on the list */
-       if (count < data_size) {
-               spin_lock(&fi->fi_ast_lock);
-               list_add(&ast->list, &fi->fi_ast_list);
-               spin_unlock(&fi->fi_ast_lock);
-       } else
-               kfree(ast);
-       return offset;
-}
-
-static unsigned int dlm_poll(struct file *file, poll_table *wait)
-{
-       struct file_info *fi = file->private_data;
-
-       poll_wait(file, &fi->fi_wait, wait);
-
-       spin_lock(&fi->fi_ast_lock);
-       if (!list_empty(&fi->fi_ast_list)) {
-               spin_unlock(&fi->fi_ast_lock);
-               return POLLIN | POLLRDNORM;
-       }
-
-       spin_unlock(&fi->fi_ast_lock);
-       return 0;
-}
-
-static struct lock_info *allocate_lockinfo(struct file_info *fi, uint8_t cmd,
-                                          struct dlm_lock_params *kparams)
-{
-       struct lock_info *li;
-
-       if (!try_module_get(THIS_MODULE))
-               return NULL;
-
-       li = kzalloc(sizeof(struct lock_info), GFP_KERNEL);
-       if (li) {
-               li->li_magic     = LOCKINFO_MAGIC;
-               li->li_file      = fi;
-               li->li_cmd       = cmd;
-               li->li_flags     = 0;
-               li->li_grmode    = -1;
-               li->li_rqmode    = -1;
-               li->li_pend_bastparam = NULL;
-               li->li_pend_bastaddr  = NULL;
-               li->li_castaddr   = NULL;
-               li->li_castparam  = NULL;
-               li->li_lksb.sb_lvbptr = NULL;
-               li->li_bastaddr  = kparams->bastaddr;
-               li->li_bastparam = kparams->bastparam;
-
-               get_file_info(fi);
-       }
-       return li;
-}
-
-static int do_user_lock(struct file_info *fi, uint8_t cmd,
-                       struct dlm_lock_params *kparams)
-{
-       struct lock_info *li;
-       int status;
-
-       /*
-        * Validate things that we need to have correct.
-        */
-       if (!kparams->castaddr)
-               return -EINVAL;
-
-       if (!kparams->lksb)
-               return -EINVAL;
-
-       /* Persistent child locks are not available yet */
-       if ((kparams->flags & DLM_LKF_PERSISTENT) && kparams->parent)
-               return -EINVAL;
-
-        /* For conversions, there should already be a lockinfo struct,
-          unless we are adopting an orphaned persistent lock */
-       if (kparams->flags & DLM_LKF_CONVERT) {
-
-               li = get_lockinfo(fi->fi_ls, kparams->lkid);
-
-               /* If this is a persistent lock we will have to create a
-                  lockinfo again */
-               if (!li && (kparams->flags & DLM_LKF_PERSISTENT)) {
-                       li = allocate_lockinfo(fi, cmd, kparams);
-                       if (!li)
-                               return -ENOMEM;
-
-                       li->li_lksb.sb_lkid = kparams->lkid;
-                       li->li_castaddr  = kparams->castaddr;
-                       li->li_castparam = kparams->castparam;
-
-                       /* OK, this isn't exactly a FIRSTLOCK but it is the
-                          first time we've used this lockinfo, and if things
-                          fail we want rid of it */
-                       init_completion(&li->li_firstcomp);
-                       set_bit(LI_FLAG_FIRSTLOCK, &li->li_flags);
-                       add_lockinfo(fi->fi_ls, li);
-
-                       /* TODO: do a query to get the current state ?? */
-               }
-               if (!li)
-                       return -EINVAL;
-
-               if (li->li_magic != LOCKINFO_MAGIC)
-                       return -EINVAL;
-
-               /* For conversions don't overwrite the current blocking AST
-                  info so that:
-                  a) if a blocking AST fires before the conversion is queued
-                     it runs the current handler
-                  b) if the conversion is cancelled, the original blocking AST
-                     declaration is active
-                  The pend_ info is made active when the conversion
-                  completes.
-               */
-               li->li_pend_bastaddr  = kparams->bastaddr;
-               li->li_pend_bastparam = kparams->bastparam;
-       } else {
-               li = allocate_lockinfo(fi, cmd, kparams);
-               if (!li)
-                       return -ENOMEM;
-
-               /* Allow us to complete our work before
-                  the AST routine runs. In fact we only need (and use) this
-                  when the initial lock fails */
-               init_completion(&li->li_firstcomp);
-               set_bit(LI_FLAG_FIRSTLOCK, &li->li_flags);
-       }
-
-       li->li_user_lksb = kparams->lksb;
-       li->li_castaddr  = kparams->castaddr;
-       li->li_castparam = kparams->castparam;
-       li->li_lksb.sb_lkid = kparams->lkid;
-       li->li_rqmode    = kparams->mode;
-       if (kparams->flags & DLM_LKF_PERSISTENT)
-               set_bit(LI_FLAG_PERSISTENT, &li->li_flags);
-
-       /* Copy in the value block */
-       if (kparams->flags & DLM_LKF_VALBLK) {
-               if (!li->li_lksb.sb_lvbptr) {
-                       li->li_lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN,
-                                                       GFP_KERNEL);
-                       if (!li->li_lksb.sb_lvbptr) {
-                               status = -ENOMEM;
-                               goto out_err;
-                       }
-               }
-
-               memcpy(li->li_lksb.sb_lvbptr, kparams->lvb, DLM_USER_LVB_LEN);
-       }
-
-       /* Lock it ... */
-       status = dlm_lock(fi->fi_ls->ls_lockspace,
-                         kparams->mode, &li->li_lksb,
-                         kparams->flags,
-                         kparams->name, kparams->namelen,
-                         kparams->parent,
-                         ast_routine,
-                         li,
-                         (li->li_pend_bastaddr || li->li_bastaddr) ?
-                          bast_routine : NULL);
-       if (status)
-               goto out_err;
-
-       /* If it succeeded (this far) with a new lock then keep track of
-          it on the file's lockinfo list */
-       if (!status && test_bit(LI_FLAG_FIRSTLOCK, &li->li_flags)) {
-
-               spin_lock(&fi->fi_li_lock);
-               list_add(&li->li_ownerqueue, &fi->fi_li_list);
-               set_bit(LI_FLAG_ONLIST, &li->li_flags);
-               spin_unlock(&fi->fi_li_lock);
-               if (add_lockinfo(fi->fi_ls, li))
-                       printk(KERN_WARNING "Add lockinfo failed\n");
-
-               complete(&li->li_firstcomp);
-       }
-
-       /* Return the lockid as the user needs it /now/ */
-       return li->li_lksb.sb_lkid;
-
- out_err:
-       if (test_bit(LI_FLAG_FIRSTLOCK, &li->li_flags))
-               release_lockinfo(fi->fi_ls, li);
-       return status;
-
-}
-
-static int do_user_unlock(struct file_info *fi, uint8_t cmd,
-                         struct dlm_lock_params *kparams)
-{
-       struct lock_info *li;
-       int status;
-       int convert_cancel = 0;
-
-       li = get_lockinfo(fi->fi_ls, kparams->lkid);
-       if (!li) {
-               li = allocate_lockinfo(fi, cmd, kparams);
-               if (!li)
-                       return -ENOMEM;
-               spin_lock(&fi->fi_li_lock);
-               list_add(&li->li_ownerqueue, &fi->fi_li_list);
-               set_bit(LI_FLAG_ONLIST, &li->li_flags);
-               spin_unlock(&fi->fi_li_lock);
-       }
-
-       if (li->li_magic != LOCKINFO_MAGIC)
-               return -EINVAL;
-
-       li->li_user_lksb = kparams->lksb;
-       li->li_castparam = kparams->castparam;
-       li->li_cmd       = cmd;
-
-       /* Cancelling a conversion doesn't remove the lock...*/
-       if (kparams->flags & DLM_LKF_CANCEL && li->li_grmode != -1)
-               convert_cancel = 1;
-
-       /* Wait until dlm_lock() has completed */
-       if (!test_bit(LI_FLAG_ONLIST, &li->li_flags)) {
-               wait_for_completion(&li->li_firstcomp);
-       }
-
-       /* dlm_unlock() passes a 0 for castaddr which means don't overwrite
-          the existing li_castaddr as that's the completion routine for
-          unlocks. dlm_unlock_wait() specifies a new AST routine to be
-          executed when the unlock completes. */
-       if (kparams->castaddr)
-               li->li_castaddr = kparams->castaddr;
-
-       /* Use existing lksb & astparams */
-       status = dlm_unlock(fi->fi_ls->ls_lockspace,
-                            kparams->lkid,
-                            kparams->flags, &li->li_lksb, li);
-
-       if (!status && !convert_cancel) {
-               spin_lock(&fi->fi_li_lock);
-               list_del(&li->li_ownerqueue);
-               clear_bit(LI_FLAG_ONLIST, &li->li_flags);
-               spin_unlock(&fi->fi_li_lock);
-       }
-
-       return status;
-}
-
-/* Write call, submit a locking request */
-static ssize_t dlm_write(struct file *file, const char __user *buffer,
-                        size_t count, loff_t *ppos)
-{
-       struct file_info *fi = file->private_data;
-       struct dlm_write_request *kparams;
-       sigset_t tmpsig;
-       sigset_t allsigs;
-       int status;
-
-#ifdef CONFIG_COMPAT
-       if (count < sizeof(struct dlm_write_request32))
-#else
-       if (count < sizeof(struct dlm_write_request))
-#endif
-               return -EINVAL;
-
-       if (count > sizeof(struct dlm_write_request) + DLM_RESNAME_MAXLEN)
-               return -EINVAL;
-
-       /* Has the lockspace been deleted */
-       if (fi && test_bit(LS_FLAG_DELETED, &fi->fi_ls->ls_flags))
-               return -ENOENT;
-
-       kparams = kmalloc(count, GFP_KERNEL);
-       if (!kparams)
-               return -ENOMEM;
-
-       status = -EFAULT;
-       /* Get the command info */
-       if (copy_from_user(kparams, buffer, count))
-               goto out_free;
-
-       status = -EBADE;
-       if (check_version(kparams))
-               goto out_free;
-
-#ifdef CONFIG_COMPAT
-       if (!kparams->is64bit) {
-               struct dlm_write_request32 *k32params = (struct dlm_write_request32 *)kparams;
-               kparams = kmalloc(count + (sizeof(struct dlm_write_request) - sizeof(struct dlm_write_request32)), GFP_KERNEL);
-               if (!kparams)
-                       return -ENOMEM;
-
-               if (fi)
-                       set_bit(FI_FLAG_COMPAT, &fi->fi_flags);
-               compat_input(kparams, k32params);
-               kfree(k32params);
-       }
-#endif
-
-       /* Block signals while we are doing this */
-       sigfillset(&allsigs);
-       sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
-
-       status = -EINVAL;
-       switch (kparams->cmd)
-       {
-       case DLM_USER_LOCK:
-               if (!fi) goto out_sig;
-               status = do_user_lock(fi, kparams->cmd, &kparams->i.lock);
-               break;
-
-       case DLM_USER_UNLOCK:
-               if (!fi) goto out_sig;
-               status = do_user_unlock(fi, kparams->cmd, &kparams->i.lock);
-               break;
-
-       case DLM_USER_CREATE_LOCKSPACE:
-               if (fi) goto out_sig;
-               status = do_user_create_lockspace(fi, kparams->cmd,
-                                                 &kparams->i.lspace);
-               break;
-
-       case DLM_USER_REMOVE_LOCKSPACE:
-               if (fi) goto out_sig;
-               status = do_user_remove_lockspace(fi, kparams->cmd,
-                                                 &kparams->i.lspace);
-               break;
-       default:
-               printk("Unknown command passed to DLM device : %d\n",
-                       kparams->cmd);
-               break;
-       }
-
- out_sig:
-       /* Restore signals */
-       sigprocmask(SIG_SETMASK, &tmpsig, NULL);
-       recalc_sigpending();
-
- out_free:
-       kfree(kparams);
-       if (status == 0)
-               return count;
-       else
-               return status;
-}
-
-static struct file_operations _dlm_fops = {
-      .open    = dlm_open,
-      .release = dlm_close,
-      .read    = dlm_read,
-      .write   = dlm_write,
-      .poll    = dlm_poll,
-      .owner   = THIS_MODULE,
-};
-
-static struct file_operations _dlm_ctl_fops = {
-      .open    = dlm_ctl_open,
-      .release = dlm_ctl_close,
-      .write   = dlm_write,
-      .owner   = THIS_MODULE,
-};
-
-/*
- * Create control device
- */
-static int __init dlm_device_init(void)
-{
-       int r;
-
-       INIT_LIST_HEAD(&user_ls_list);
-       mutex_init(&user_ls_lock);
-
-       ctl_device.name = "dlm-control";
-       ctl_device.fops = &_dlm_ctl_fops;
-       ctl_device.minor = MISC_DYNAMIC_MINOR;
-
-       r = misc_register(&ctl_device);
-       if (r) {
-               printk(KERN_ERR "dlm: misc_register failed for control dev\n");
-               return r;
-       }
-
-       return 0;
-}
-
-static void __exit dlm_device_exit(void)
-{
-       misc_deregister(&ctl_device);
-}
-
-MODULE_DESCRIPTION("Distributed Lock Manager device interface");
-MODULE_AUTHOR("Red Hat, Inc.");
-MODULE_LICENSE("GPL");
-
-module_init(dlm_device_init);
-module_exit(dlm_device_exit);
 
 #include <linux/kref.h>
 #include <linux/kernel.h>
 #include <linux/jhash.h>
+#include <linux/miscdevice.h>
 #include <linux/mutex.h>
 #include <asm/semaphore.h>
 #include <asm/uaccess.h>
 #define log_error(ls, fmt, args...) \
        printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args)
 
+#define DLM_LOG_DEBUG
 #ifdef DLM_LOG_DEBUG
 #define log_debug(ls, fmt, args...) log_error(ls, fmt, ##args)
 #else
 
 #define DLM_IFL_MSTCPY         0x00010000
 #define DLM_IFL_RESEND         0x00020000
+#define DLM_IFL_DEAD           0x00040000
+#define DLM_IFL_USER           0x00000001
+#define DLM_IFL_ORPHAN         0x00000002
 
 struct dlm_lkb {
        struct dlm_rsb          *lkb_resource;  /* the rsb */
        struct list_head        lkb_rsb_lookup; /* waiting for rsb lookup */
        struct list_head        lkb_wait_reply; /* waiting for remote reply */
        struct list_head        lkb_astqueue;   /* need ast to be sent */
+       struct list_head        lkb_ownqueue;   /* list of locks for a process */
 
        char                    *lkb_lvbptr;
        struct dlm_lksb         *lkb_lksb;      /* caller's status block */
 
 struct dlm_ls {
        struct list_head        ls_list;        /* list of lockspaces */
+       dlm_lockspace_t         *ls_local_handle;
        uint32_t                ls_global_id;   /* global unique lockspace ID */
        uint32_t                ls_exflags;
        int                     ls_lvblen;
        wait_queue_head_t       ls_uevent_wait; /* user part of join/leave */
        int                     ls_uevent_result;
 
+       struct miscdevice       ls_device;
+
        /* recovery related */
 
        struct timer_list       ls_timer;
        spinlock_t              ls_recover_list_lock;
        int                     ls_recover_list_count;
        wait_queue_head_t       ls_wait_general;
+       struct mutex            ls_clear_proc_locks;
 
        struct list_head        ls_root_list;   /* root resources */
        struct rw_semaphore     ls_root_sem;    /* protect root_list */
 #define LSFL_RCOM_READY                3
 #define LSFL_UEVENT_WAIT       4
 
+/* much of this is just saving user space pointers associated with the
+   lock that we pass back to the user lib with an ast */
+
+struct dlm_user_args {
+       struct dlm_user_proc    *proc; /* each process that opens the lockspace
+                                         device has private data
+                                         (dlm_user_proc) on the struct file,
+                                         the process's locks point back to it*/
+       struct dlm_lksb         lksb;
+       int                     old_mode;
+       int                     update_user_lvb;
+       struct dlm_lksb __user  *user_lksb;
+       void __user             *castparam;
+       void __user             *castaddr;
+       void __user             *bastparam;
+       void __user             *bastaddr;
+};
+
+#define DLM_PROC_FLAGS_CLOSING 1
+#define DLM_PROC_FLAGS_COMPAT  2
+
+/* locks list is kept so we can remove all a process's locks when it
+   exits (or orphan those that are persistent) */
+
+struct dlm_user_proc {
+       dlm_lockspace_t         *lockspace;
+       unsigned long           flags; /* DLM_PROC_FLAGS */
+       struct list_head        asts;
+       spinlock_t              asts_spin;
+       struct list_head        locks;
+       spinlock_t              locks_spin;
+       wait_queue_head_t       wait;
+};
+
 static inline int dlm_locking_stopped(struct dlm_ls *ls)
 {
        return !test_bit(LSFL_RUNNING, &ls->ls_flags);
 
                                    R: do_xxxx()
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
-
+#include <linux/types.h>
 #include "dlm_internal.h"
+#include <linux/dlm_device.h>
 #include "memory.h"
 #include "lowcomms.h"
 #include "requestqueue.h"
 #include "rcom.h"
 #include "recover.h"
 #include "lvb_table.h"
+#include "user.h"
 #include "config.h"
 
 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
                                    struct dlm_message *ms);
 static int receive_extralen(struct dlm_message *ms);
 
+#define FAKE_USER_AST (void*)0xff00ff00
+
 /*
  * Lock compatibilty matrix - thanks Steve
  * UN = Unlocked state. Not really a state, used as a flag
         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
 };
 
-static void dlm_print_lkb(struct dlm_lkb *lkb)
+void dlm_print_lkb(struct dlm_lkb *lkb)
 {
        printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
               "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
                if (len == r->res_length && !memcmp(name, r->res_name, len))
                        goto found;
        }
-       return -ENOENT;
+       return -EBADR;
 
  found:
        if (r->res_nodeid && (flags & R_MASTER))
        if (!error)
                goto out;
 
-       if (error == -ENOENT && !(flags & R_CREATE))
+       if (error == -EBADR && !(flags & R_CREATE))
                goto out;
 
        /* the rsb was found but wasn't a master copy */
        if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
                return;
 
-       b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+       b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
        if (b == 1) {
                int len = receive_extralen(ms);
                memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
        lkb->lkb_rqmode = DLM_LOCK_IV;
 
        switch (lkb->lkb_status) {
+       case DLM_LKSTS_GRANTED:
+               break;
        case DLM_LKSTS_CONVERT:
                move_lkb(r, lkb, DLM_LKSTS_GRANTED);
                break;
        return -DLM_EUNLOCK;
 }
 
+/* FIXME: if revert_lock() finds that the lkb is granted, we should
+   skip the queue_cast(ECANCEL).  It indicates that the request/convert
+   completed (and queued a normal ast) just before the cancel; we don't
+   want to clobber the sb_result for the normal ast with ECANCEL. */
+   
 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
 {
        revert_lock(r, lkb);
                confirm_master(r, error);
                break;
 
-       case -ENOENT:
+       case -EBADR:
        case -ENOTBLK:
                /* find_rsb failed to find rsb or rsb wasn't master */
                r->res_nodeid = -1;
        return 0;
 }
 
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
+                    int mode, uint32_t flags, void *name, unsigned int namelen,
+                    uint32_t parent_lkid)
+{
+       struct dlm_lkb *lkb;
+       struct dlm_args args;
+       int error;
+
+       lock_recovery(ls);
+
+       error = create_lkb(ls, &lkb);
+       if (error) {
+               kfree(ua);
+               goto out;
+       }
+
+       if (flags & DLM_LKF_VALBLK) {
+               ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+               if (!ua->lksb.sb_lvbptr) {
+                       kfree(ua);
+                       __put_lkb(ls, lkb);
+                       error = -ENOMEM;
+                       goto out;
+               }
+       }
+
+       /* After ua is attached to lkb it will be freed by free_lkb().
+          When DLM_IFL_USER is set, the dlm knows that this is a userspace
+          lock and that lkb_astparam is the dlm_user_args structure. */
+
+       error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
+                             FAKE_USER_AST, ua, FAKE_USER_AST, &args);
+       lkb->lkb_flags |= DLM_IFL_USER;
+       ua->old_mode = DLM_LOCK_IV;
+
+       if (error) {
+               __put_lkb(ls, lkb);
+               goto out;
+       }
+
+       error = request_lock(ls, lkb, name, namelen, &args);
+
+       switch (error) {
+       case 0:
+               break;
+       case -EINPROGRESS:
+               error = 0;
+               break;
+       case -EAGAIN:
+               error = 0;
+               /* fall through */
+       default:
+               __put_lkb(ls, lkb);
+               goto out;
+       }
+
+       /* add this new lkb to the per-process list of locks */
+       spin_lock(&ua->proc->locks_spin);
+       kref_get(&lkb->lkb_ref);
+       list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
+       spin_unlock(&ua->proc->locks_spin);
+ out:
+       unlock_recovery(ls);
+       return error;
+}
+
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+                    int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+       struct dlm_lkb *lkb;
+       struct dlm_args args;
+       struct dlm_user_args *ua;
+       int error;
+
+       lock_recovery(ls);
+
+       error = find_lkb(ls, lkid, &lkb);
+       if (error)
+               goto out;
+
+       /* user can change the params on its lock when it converts it, or
+          add an lvb that didn't exist before */
+
+       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+       if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
+               ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+               if (!ua->lksb.sb_lvbptr) {
+                       error = -ENOMEM;
+                       goto out_put;
+               }
+       }
+       if (lvb_in && ua->lksb.sb_lvbptr)
+               memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+
+       ua->castparam = ua_tmp->castparam;
+       ua->castaddr = ua_tmp->castaddr;
+       ua->bastparam = ua_tmp->bastparam;
+       ua->bastaddr = ua_tmp->bastaddr;
+       ua->old_mode = lkb->lkb_grmode;
+
+       error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
+                             FAKE_USER_AST, &args);
+       if (error)
+               goto out_put;
+
+       error = convert_lock(ls, lkb, &args);
+
+       if (error == -EINPROGRESS || error == -EAGAIN)
+               error = 0;
+ out_put:
+       dlm_put_lkb(lkb);
+ out:
+       unlock_recovery(ls);
+       kfree(ua_tmp);
+       return error;
+}
+
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+                   uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+       struct dlm_lkb *lkb;
+       struct dlm_args args;
+       struct dlm_user_args *ua;
+       int error;
+
+       lock_recovery(ls);
+
+       error = find_lkb(ls, lkid, &lkb);
+       if (error)
+               goto out;
+
+       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+       if (lvb_in && ua->lksb.sb_lvbptr)
+               memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+       ua->castparam = ua_tmp->castparam;
+
+       error = set_unlock_args(flags, ua, &args);
+       if (error)
+               goto out_put;
+
+       error = unlock_lock(ls, lkb, &args);
+
+       if (error == -DLM_EUNLOCK)
+               error = 0;
+       if (error)
+               goto out_put;
+
+       spin_lock(&ua->proc->locks_spin);
+       list_del(&lkb->lkb_ownqueue);
+       spin_unlock(&ua->proc->locks_spin);
+
+       /* this removes the reference for the proc->locks list added by
+          dlm_user_request */
+       unhold_lkb(lkb);
+ out_put:
+       dlm_put_lkb(lkb);
+ out:
+       unlock_recovery(ls);
+       return error;
+}
+
+int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+                   uint32_t flags, uint32_t lkid)
+{
+       struct dlm_lkb *lkb;
+       struct dlm_args args;
+       struct dlm_user_args *ua;
+       int error;
+
+       lock_recovery(ls);
+
+       error = find_lkb(ls, lkid, &lkb);
+       if (error)
+               goto out;
+
+       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       ua->castparam = ua_tmp->castparam;
+
+       error = set_unlock_args(flags, ua, &args);
+       if (error)
+               goto out_put;
+
+       error = cancel_lock(ls, lkb, &args);
+
+       if (error == -DLM_ECANCEL)
+               error = 0;
+       if (error)
+               goto out_put;
+
+       /* this lkb was removed from the WAITING queue */
+       if (lkb->lkb_grmode == DLM_LOCK_IV) {
+               spin_lock(&ua->proc->locks_spin);
+               list_del(&lkb->lkb_ownqueue);
+               spin_unlock(&ua->proc->locks_spin);
+               unhold_lkb(lkb);
+       }
+ out_put:
+       dlm_put_lkb(lkb);
+ out:
+       unlock_recovery(ls);
+       return error;
+}
+
+static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+       struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+       if (ua->lksb.sb_lvbptr)
+               kfree(ua->lksb.sb_lvbptr);
+       kfree(ua);
+       lkb->lkb_astparam = (long)NULL;
+
+       /* TODO: propogate to master if needed */
+       return 0;
+}
+
+/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
+   Regardless of what rsb queue the lock is on, it's removed and freed. */
+
+static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+       struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       struct dlm_args args;
+       int error;
+
+       /* FIXME: we need to handle the case where the lkb is in limbo
+          while the rsb is being looked up, currently we assert in
+          _unlock_lock/is_remote because rsb nodeid is -1. */
+
+       set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+
+       error = unlock_lock(ls, lkb, &args);
+       if (error == -DLM_EUNLOCK)
+               error = 0;
+       return error;
+}
+
+/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
+   1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
+   which we clear here. */
+
+/* proc CLOSING flag is set so no more device_reads should look at proc->asts
+   list, and no more device_writes should add lkb's to proc->locks list; so we
+   shouldn't need to take asts_spin or locks_spin here.  this assumes that
+   device reads/writes/closes are serialized -- FIXME: we may need to serialize
+   them ourself. */
+
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+{
+       struct dlm_lkb *lkb, *safe;
+
+       lock_recovery(ls);
+       mutex_lock(&ls->ls_clear_proc_locks);
+
+       list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
+               if (lkb->lkb_ast_type) {
+                       list_del(&lkb->lkb_astqueue);
+                       unhold_lkb(lkb);
+               }
+
+               list_del(&lkb->lkb_ownqueue);
+
+               if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
+                       lkb->lkb_flags |= DLM_IFL_ORPHAN;
+                       orphan_proc_lock(ls, lkb);
+               } else {
+                       lkb->lkb_flags |= DLM_IFL_DEAD;
+                       unlock_proc_lock(ls, lkb);
+               }
+
+               /* this removes the reference for the proc->locks list
+                  added by dlm_user_request, it may result in the lkb
+                  being freed */
+
+               dlm_put_lkb(lkb);
+       }
+       mutex_unlock(&ls->ls_clear_proc_locks);
+       unlock_recovery(ls);
+}
 
 #define __LOCK_DOT_H__
 
 void dlm_print_rsb(struct dlm_rsb *r);
+void dlm_print_lkb(struct dlm_lkb *lkb);
 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery);
 int dlm_modes_compat(int mode1, int mode2);
 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
 
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua, int mode,
+       uint32_t flags, void *name, unsigned int namelen, uint32_t parent_lkid);
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+       int mode, uint32_t flags, uint32_t lkid, char *lvb_in);
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+       uint32_t flags, uint32_t lkid, char *lvb_in);
+int dlm_user_cancel(struct dlm_ls *ls,  struct dlm_user_args *ua_tmp,
+       uint32_t flags, uint32_t lkid);
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc);
+
 static inline int is_master(struct dlm_rsb *r)
 {
        return !r->res_nodeid;
 
        return ls;
 }
 
-struct dlm_ls *dlm_find_lockspace_local(void *id)
+struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace)
 {
-       struct dlm_ls *ls = id;
+       struct dlm_ls *ls;
 
        spin_lock(&lslist_lock);
-       ls->ls_count++;
+       list_for_each_entry(ls, &lslist, ls_list) {
+               if (ls->ls_local_handle == lockspace) {
+                       ls->ls_count++;
+                       goto out;
+               }
+       }
+       ls = NULL;
+ out:
+       spin_unlock(&lslist_lock);
+       return ls;
+}
+
+struct dlm_ls *dlm_find_lockspace_device(int minor)
+{
+       struct dlm_ls *ls;
+
+       spin_lock(&lslist_lock);
+       list_for_each_entry(ls, &lslist, ls_list) {
+               if (ls->ls_device.minor == minor) {
+                       ls->ls_count++;
+                       goto out;
+               }
+       }
+       ls = NULL;
+ out:
        spin_unlock(&lslist_lock);
        return ls;
 }
        init_rwsem(&ls->ls_in_recovery);
        INIT_LIST_HEAD(&ls->ls_requestqueue);
        mutex_init(&ls->ls_requestqueue_mutex);
+       mutex_init(&ls->ls_clear_proc_locks);
 
        ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
        if (!ls->ls_recover_buf)
        INIT_LIST_HEAD(&ls->ls_recover_list);
        spin_lock_init(&ls->ls_recover_list_lock);
        ls->ls_recover_list_count = 0;
+       ls->ls_local_handle = ls;
        init_waitqueue_head(&ls->ls_wait_general);
        INIT_LIST_HEAD(&ls->ls_root_list);
        init_rwsem(&ls->ls_root_sem);
 
 void dlm_lockspace_exit(void);
 struct dlm_ls *dlm_find_lockspace_global(uint32_t id);
 struct dlm_ls *dlm_find_lockspace_local(void *id);
+struct dlm_ls *dlm_find_lockspace_device(int minor);
 void dlm_put_lockspace(struct dlm_ls *ls);
 
 #endif                         /* __LOCKSPACE_DOT_H__ */
 
 #include "dlm_internal.h"
 #include "lockspace.h"
 #include "lock.h"
+#include "user.h"
 #include "memory.h"
 #include "lowcomms.h"
 #include "config.h"
        if (error)
                goto out_debug;
 
+       error = dlm_user_init();
+       if (error)
+               goto out_lowcomms;
+
        printk("DLM (built %s %s) installed\n", __DATE__, __TIME__);
 
        return 0;
 
+ out_lowcomms:
+       dlm_lowcomms_exit();
  out_debug:
        dlm_unregister_debugfs();
  out_config:
 
 static void __exit exit_dlm(void)
 {
+       dlm_user_exit();
        dlm_lowcomms_exit();
        dlm_config_exit();
        dlm_memory_exit();
 
 
 void free_lkb(struct dlm_lkb *lkb)
 {
+       if (lkb->lkb_flags & DLM_IFL_USER) {
+               struct dlm_user_args *ua;
+               ua = (struct dlm_user_args *)lkb->lkb_astparam;
+               if (ua) {
+                       if (ua->lksb.sb_lvbptr)
+                               kfree(ua->lksb.sb_lvbptr);
+                       kfree(ua);
+               }
+       }
        kmem_cache_free(lkb_cache, lkb);
 }
 
 
--- /dev/null
+/*
+ * Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU General Public License v.2.
+ */
+
+#include <linux/miscdevice.h>
+#include <linux/init.h>
+#include <linux/wait.h>
+#include <linux/module.h>
+#include <linux/file.h>
+#include <linux/fs.h>
+#include <linux/poll.h>
+#include <linux/signal.h>
+#include <linux/spinlock.h>
+#include <linux/dlm.h>
+#include <linux/dlm_device.h>
+
+#include "dlm_internal.h"
+#include "lockspace.h"
+#include "lock.h"
+#include "lvb_table.h"
+
+static const char *name_prefix="dlm";
+static struct miscdevice ctl_device;
+static struct file_operations device_fops;
+
+#ifdef CONFIG_COMPAT
+
+struct dlm_lock_params32 {
+       __u8 mode;
+       __u8 namelen;
+       __u16 flags;
+       __u32 lkid;
+       __u32 parent;
+
+       __u32 castparam;
+       __u32 castaddr;
+       __u32 bastparam;
+       __u32 bastaddr;
+       __u32 lksb;
+
+       char lvb[DLM_USER_LVB_LEN];
+       char name[0];
+};
+
+struct dlm_write_request32 {
+       __u32 version[3];
+       __u8 cmd;
+       __u8 is64bit;
+       __u8 unused[2];
+
+       union  {
+               struct dlm_lock_params32 lock;
+               struct dlm_lspace_params lspace;
+       } i;
+};
+
+struct dlm_lksb32 {
+       __u32 sb_status;
+       __u32 sb_lkid;
+       __u8 sb_flags;
+       __u32 sb_lvbptr;
+};
+
+struct dlm_lock_result32 {
+       __u32 length;
+       __u32 user_astaddr;
+       __u32 user_astparam;
+       __u32 user_lksb;
+       struct dlm_lksb32 lksb;
+       __u8 bast_mode;
+       __u8 unused[3];
+       /* Offsets may be zero if no data is present */
+       __u32 lvb_offset;
+};
+
+static void compat_input(struct dlm_write_request *kb,
+                        struct dlm_write_request32 *kb32)
+{
+       kb->version[0] = kb32->version[0];
+       kb->version[1] = kb32->version[1];
+       kb->version[2] = kb32->version[2];
+
+       kb->cmd = kb32->cmd;
+       kb->is64bit = kb32->is64bit;
+       if (kb->cmd == DLM_USER_CREATE_LOCKSPACE ||
+           kb->cmd == DLM_USER_REMOVE_LOCKSPACE) {
+               kb->i.lspace.flags = kb32->i.lspace.flags;
+               kb->i.lspace.minor = kb32->i.lspace.minor;
+               strcpy(kb->i.lspace.name, kb32->i.lspace.name);
+       } else {
+               kb->i.lock.mode = kb32->i.lock.mode;
+               kb->i.lock.namelen = kb32->i.lock.namelen;
+               kb->i.lock.flags = kb32->i.lock.flags;
+               kb->i.lock.lkid = kb32->i.lock.lkid;
+               kb->i.lock.parent = kb32->i.lock.parent;
+               kb->i.lock.castparam = (void *)(long)kb32->i.lock.castparam;
+               kb->i.lock.castaddr = (void *)(long)kb32->i.lock.castaddr;
+               kb->i.lock.bastparam = (void *)(long)kb32->i.lock.bastparam;
+               kb->i.lock.bastaddr = (void *)(long)kb32->i.lock.bastaddr;
+               kb->i.lock.lksb = (void *)(long)kb32->i.lock.lksb;
+               memcpy(kb->i.lock.lvb, kb32->i.lock.lvb, DLM_USER_LVB_LEN);
+               memcpy(kb->i.lock.name, kb32->i.lock.name, kb->i.lock.namelen);
+       }
+}
+
+static void compat_output(struct dlm_lock_result *res,
+                         struct dlm_lock_result32 *res32)
+{
+       res32->length = res->length - (sizeof(struct dlm_lock_result) -
+                                      sizeof(struct dlm_lock_result32));
+       res32->user_astaddr = (__u32)(long)res->user_astaddr;
+       res32->user_astparam = (__u32)(long)res->user_astparam;
+       res32->user_lksb = (__u32)(long)res->user_lksb;
+       res32->bast_mode = res->bast_mode;
+
+       res32->lvb_offset = res->lvb_offset;
+       res32->length = res->length;
+
+       res32->lksb.sb_status = res->lksb.sb_status;
+       res32->lksb.sb_flags = res->lksb.sb_flags;
+       res32->lksb.sb_lkid = res->lksb.sb_lkid;
+       res32->lksb.sb_lvbptr = (__u32)(long)res->lksb.sb_lvbptr;
+}
+#endif
+
+
+void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
+{
+       struct dlm_ls *ls;
+       struct dlm_user_args *ua;
+       struct dlm_user_proc *proc;
+
+       /* dlm_clear_proc_locks() sets ORPHAN/DEAD flag on each
+          lkb before dealing with it.  We need to check this
+          flag before taking ls_clear_proc_locks mutex because if
+          it's set, dlm_clear_proc_locks() holds the mutex. */
+
+       if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
+               /* log_print("user_add_ast skip1 %x", lkb->lkb_flags); */
+               return;
+       }
+
+       ls = lkb->lkb_resource->res_ls;
+       mutex_lock(&ls->ls_clear_proc_locks);
+
+       /* If ORPHAN/DEAD flag is set, it means the process is dead so an ast
+          can't be delivered.  For ORPHAN's, dlm_clear_proc_locks() freed
+          lkb->ua so we can't try to use it. */
+
+       if (lkb->lkb_flags & (DLM_IFL_ORPHAN | DLM_IFL_DEAD)) {
+               /* log_print("user_add_ast skip2 %x", lkb->lkb_flags); */
+               goto out;
+       }
+
+       DLM_ASSERT(lkb->lkb_astparam, dlm_print_lkb(lkb););
+       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       proc = ua->proc;
+
+       if (type == AST_BAST && ua->bastaddr == NULL)
+               goto out;
+
+       spin_lock(&proc->asts_spin);
+       if (!(lkb->lkb_ast_type & (AST_COMP | AST_BAST))) {
+               kref_get(&lkb->lkb_ref);
+               list_add_tail(&lkb->lkb_astqueue, &proc->asts);
+               lkb->lkb_ast_type |= type;
+               wake_up_interruptible(&proc->wait);
+       }
+
+       /* We want to copy the lvb to userspace when the completion
+          ast is read if the status is 0, the lock has an lvb and
+          lvb_ops says we should.  We could probably have set_lvb_lock()
+          set update_user_lvb instead and not need old_mode */
+
+       if ((lkb->lkb_ast_type & AST_COMP) &&
+           (lkb->lkb_lksb->sb_status == 0) &&
+           lkb->lkb_lksb->sb_lvbptr &&
+           dlm_lvb_operations[ua->old_mode + 1][lkb->lkb_grmode + 1])
+               ua->update_user_lvb = 1;
+       else
+               ua->update_user_lvb = 0;
+
+       spin_unlock(&proc->asts_spin);
+ out:
+       mutex_unlock(&ls->ls_clear_proc_locks);
+}
+
+static int device_user_lock(struct dlm_user_proc *proc,
+                           struct dlm_lock_params *params)
+{
+       struct dlm_ls *ls;
+       struct dlm_user_args *ua;
+       int error = -ENOMEM;
+
+       ls = dlm_find_lockspace_local(proc->lockspace);
+       if (!ls)
+               return -ENOENT;
+
+       if (!params->castaddr || !params->lksb) {
+               error = -EINVAL;
+               goto out;
+       }
+
+       ua = kzalloc(sizeof(struct dlm_user_args), GFP_KERNEL);
+       if (!ua)
+               goto out;
+       ua->proc = proc;
+       ua->user_lksb = params->lksb;
+       ua->castparam = params->castparam;
+       ua->castaddr = params->castaddr;
+       ua->bastparam = params->bastparam;
+       ua->bastaddr = params->bastaddr;
+
+       if (params->flags & DLM_LKF_CONVERT)
+               error = dlm_user_convert(ls, ua,
+                                        params->mode, params->flags,
+                                        params->lkid, params->lvb);
+       else {
+               error = dlm_user_request(ls, ua,
+                                        params->mode, params->flags,
+                                        params->name, params->namelen,
+                                        params->parent);
+               if (!error)
+                       error = ua->lksb.sb_lkid;
+       }
+ out:
+       dlm_put_lockspace(ls);
+       return error;
+}
+
+static int device_user_unlock(struct dlm_user_proc *proc,
+                             struct dlm_lock_params *params)
+{
+       struct dlm_ls *ls;
+       struct dlm_user_args *ua;
+       int error = -ENOMEM;
+
+       ls = dlm_find_lockspace_local(proc->lockspace);
+       if (!ls)
+               return -ENOENT;
+
+       ua = kzalloc(sizeof(struct dlm_user_args), GFP_KERNEL);
+       if (!ua)
+               goto out;
+       ua->proc = proc;
+       ua->user_lksb = params->lksb;
+       ua->castparam = params->castparam;
+       ua->castaddr = params->castaddr;
+
+       if (params->flags & DLM_LKF_CANCEL)
+               error = dlm_user_cancel(ls, ua, params->flags, params->lkid);
+       else
+               error = dlm_user_unlock(ls, ua, params->flags, params->lkid,
+                                       params->lvb);
+ out:
+       dlm_put_lockspace(ls);
+       return error;
+}
+
+static int device_create_lockspace(struct dlm_lspace_params *params)
+{
+       dlm_lockspace_t *lockspace;
+       struct dlm_ls *ls;
+       int error, len;
+
+       if (!capable(CAP_SYS_ADMIN))
+               return -EPERM;
+
+       error = dlm_new_lockspace(params->name, strlen(params->name),
+                                 &lockspace, 0, DLM_USER_LVB_LEN);
+       if (error)
+               return error;
+
+       ls = dlm_find_lockspace_local(lockspace);
+       if (!ls)
+               return -ENOENT;
+
+       error = -ENOMEM;
+       len = strlen(params->name) + strlen(name_prefix) + 2;
+       ls->ls_device.name = kzalloc(len, GFP_KERNEL);
+       if (!ls->ls_device.name)
+               goto fail;
+       snprintf((char *)ls->ls_device.name, len, "%s_%s", name_prefix,
+                params->name);
+       ls->ls_device.fops = &device_fops;
+       ls->ls_device.minor = MISC_DYNAMIC_MINOR;
+
+       error = misc_register(&ls->ls_device);
+       if (error) {
+               kfree(ls->ls_device.name);
+               goto fail;
+       }
+
+       error = ls->ls_device.minor;
+       dlm_put_lockspace(ls);
+       return error;
+
+ fail:
+       dlm_put_lockspace(ls);
+       dlm_release_lockspace(lockspace, 0);
+       return error;
+}
+
+static int device_remove_lockspace(struct dlm_lspace_params *params)
+{
+       dlm_lockspace_t *lockspace;
+       struct dlm_ls *ls;
+       int error;
+
+       if (!capable(CAP_SYS_ADMIN))
+               return -EPERM;
+
+       ls = dlm_find_lockspace_device(params->minor);
+       if (!ls)
+               return -ENOENT;
+
+       error = misc_deregister(&ls->ls_device);
+       if (error) {
+               dlm_put_lockspace(ls);
+               goto out;
+       }
+       kfree(ls->ls_device.name);
+
+       lockspace = ls->ls_local_handle;
+
+       /* dlm_release_lockspace waits for references to go to zero,
+          so all processes will need to close their device for the ls
+          before the release will procede */
+
+       dlm_put_lockspace(ls);
+       error = dlm_release_lockspace(lockspace, 0);
+out:
+       return error;
+}
+
+/* Check the user's version matches ours */
+static int check_version(struct dlm_write_request *req)
+{
+       if (req->version[0] != DLM_DEVICE_VERSION_MAJOR ||
+           (req->version[0] == DLM_DEVICE_VERSION_MAJOR &&
+            req->version[1] > DLM_DEVICE_VERSION_MINOR)) {
+
+               printk(KERN_DEBUG "dlm: process %s (%d) version mismatch "
+                      "user (%d.%d.%d) kernel (%d.%d.%d)\n",
+                      current->comm,
+                      current->pid,
+                      req->version[0],
+                      req->version[1],
+                      req->version[2],
+                      DLM_DEVICE_VERSION_MAJOR,
+                      DLM_DEVICE_VERSION_MINOR,
+                      DLM_DEVICE_VERSION_PATCH);
+               return -EINVAL;
+       }
+       return 0;
+}
+
+/*
+ * device_write
+ *
+ *   device_user_lock
+ *     dlm_user_request -> request_lock
+ *     dlm_user_convert -> convert_lock
+ *
+ *   device_user_unlock
+ *     dlm_user_unlock -> unlock_lock
+ *     dlm_user_cancel -> cancel_lock
+ *
+ *   device_create_lockspace
+ *     dlm_new_lockspace
+ *
+ *   device_remove_lockspace
+ *     dlm_release_lockspace
+ */
+
+/* a write to a lockspace device is a lock or unlock request, a write
+   to the control device is to create/remove a lockspace */
+
+static ssize_t device_write(struct file *file, const char __user *buf,
+                           size_t count, loff_t *ppos)
+{
+       struct dlm_user_proc *proc = file->private_data;
+       struct dlm_write_request *kbuf;
+       sigset_t tmpsig, allsigs;
+       int error;
+
+#ifdef CONFIG_COMPAT
+       if (count < sizeof(struct dlm_write_request32))
+#else
+       if (count < sizeof(struct dlm_write_request))
+#endif
+               return -EINVAL;
+
+       kbuf = kmalloc(count, GFP_KERNEL);
+       if (!kbuf)
+               return -ENOMEM;
+
+       if (copy_from_user(kbuf, buf, count)) {
+               error = -EFAULT;
+               goto out_free;
+       }
+
+       if (check_version(kbuf)) {
+               error = -EBADE;
+               goto out_free;
+       }
+
+#ifdef CONFIG_COMPAT
+       if (!kbuf->is64bit) {
+               struct dlm_write_request32 *k32buf;
+               k32buf = (struct dlm_write_request32 *)kbuf;
+               kbuf = kmalloc(count + (sizeof(struct dlm_write_request) -
+                              sizeof(struct dlm_write_request32)), GFP_KERNEL);
+               if (!kbuf)
+                       return -ENOMEM;
+
+               if (proc)
+                       set_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags);
+               compat_input(kbuf, k32buf);
+               kfree(k32buf);
+       }
+#endif
+
+       /* do we really need this? can a write happen after a close? */
+       if ((kbuf->cmd == DLM_USER_LOCK || kbuf->cmd == DLM_USER_UNLOCK) &&
+           test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
+               return -EINVAL;
+
+       sigfillset(&allsigs);
+       sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
+
+       error = -EINVAL;
+
+       switch (kbuf->cmd)
+       {
+       case DLM_USER_LOCK:
+               if (!proc) {
+                       log_print("no locking on control device");
+                       goto out_sig;
+               }
+               error = device_user_lock(proc, &kbuf->i.lock);
+               break;
+
+       case DLM_USER_UNLOCK:
+               if (!proc) {
+                       log_print("no locking on control device");
+                       goto out_sig;
+               }
+               error = device_user_unlock(proc, &kbuf->i.lock);
+               break;
+
+       case DLM_USER_CREATE_LOCKSPACE:
+               if (proc) {
+                       log_print("create/remove only on control device");
+                       goto out_sig;
+               }
+               error = device_create_lockspace(&kbuf->i.lspace);
+               break;
+
+       case DLM_USER_REMOVE_LOCKSPACE:
+               if (proc) {
+                       log_print("create/remove only on control device");
+                       goto out_sig;
+               }
+               error = device_remove_lockspace(&kbuf->i.lspace);
+               break;
+
+       default:
+               log_print("Unknown command passed to DLM device : %d\n",
+                         kbuf->cmd);
+       }
+
+ out_sig:
+       sigprocmask(SIG_SETMASK, &tmpsig, NULL);
+       recalc_sigpending();
+ out_free:
+       kfree(kbuf);
+       return error;
+}
+
+/* Every process that opens the lockspace device has its own "proc" structure
+   hanging off the open file that's used to keep track of locks owned by the
+   process and asts that need to be delivered to the process. */
+
+static int device_open(struct inode *inode, struct file *file)
+{
+       struct dlm_user_proc *proc;
+       struct dlm_ls *ls;
+
+       ls = dlm_find_lockspace_device(iminor(inode));
+       if (!ls)
+               return -ENOENT;
+
+       proc = kzalloc(sizeof(struct dlm_user_proc), GFP_KERNEL);
+       if (!proc) {
+               dlm_put_lockspace(ls);
+               return -ENOMEM;
+       }
+
+       proc->lockspace = ls->ls_local_handle;
+       INIT_LIST_HEAD(&proc->asts);
+       INIT_LIST_HEAD(&proc->locks);
+       spin_lock_init(&proc->asts_spin);
+       spin_lock_init(&proc->locks_spin);
+       init_waitqueue_head(&proc->wait);
+       file->private_data = proc;
+
+       return 0;
+}
+
+static int device_close(struct inode *inode, struct file *file)
+{
+       struct dlm_user_proc *proc = file->private_data;
+       struct dlm_ls *ls;
+       sigset_t tmpsig, allsigs;
+
+       ls = dlm_find_lockspace_local(proc->lockspace);
+       if (!ls)
+               return -ENOENT;
+
+       sigfillset(&allsigs);
+       sigprocmask(SIG_BLOCK, &allsigs, &tmpsig);
+
+       set_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags);
+
+       dlm_clear_proc_locks(ls, proc);
+
+       /* at this point no more lkb's should exist for this lockspace,
+          so there's no chance of dlm_user_add_ast() being called and
+          looking for lkb->ua->proc */
+
+       kfree(proc);
+       file->private_data = NULL;
+
+       dlm_put_lockspace(ls);
+       dlm_put_lockspace(ls);  /* for the find in device_open() */
+
+       /* FIXME: AUTOFREE: if this ls is no longer used do
+          device_remove_lockspace() */
+
+       sigprocmask(SIG_SETMASK, &tmpsig, NULL);
+       recalc_sigpending();
+
+       return 0;
+}
+
+static int copy_result_to_user(struct dlm_user_args *ua, int compat, int type,
+                              int bmode, char __user *buf, size_t count)
+{
+#ifdef CONFIG_COMPAT
+       struct dlm_lock_result32 result32;
+#endif
+       struct dlm_lock_result result;
+       void *resultptr;
+       int error=0;
+       int len;
+       int struct_len;
+
+       memset(&result, 0, sizeof(struct dlm_lock_result));
+       memcpy(&result.lksb, &ua->lksb, sizeof(struct dlm_lksb));
+       result.user_lksb = ua->user_lksb;
+
+       /* FIXME: dlm1 provides for the user's bastparam/addr to not be updated
+          in a conversion unless the conversion is successful.  See code
+          in dlm_user_convert() for updating ua from ua_tmp.  OpenVMS, though,
+          notes that a new blocking AST address and parameter are set even if
+          the conversion fails, so maybe we should just do that. */
+
+       if (type == AST_BAST) {
+               result.user_astaddr = ua->bastaddr;
+               result.user_astparam = ua->bastparam;
+               result.bast_mode = bmode;
+       } else {
+               result.user_astaddr = ua->castaddr;
+               result.user_astparam = ua->castparam;
+       }
+
+#ifdef CONFIG_COMPAT
+       if (compat)
+               len = sizeof(struct dlm_lock_result32);
+       else
+#endif
+               len = sizeof(struct dlm_lock_result);
+       struct_len = len;
+
+       /* copy lvb to userspace if there is one, it's been updated, and
+          the user buffer has space for it */
+
+       if (ua->update_user_lvb && ua->lksb.sb_lvbptr &&
+           count >= len + DLM_USER_LVB_LEN) {
+               if (copy_to_user(buf+len, ua->lksb.sb_lvbptr,
+                                DLM_USER_LVB_LEN)) {
+                       error = -EFAULT;
+                       goto out;
+               }
+
+               result.lvb_offset = len;
+               len += DLM_USER_LVB_LEN;
+       }
+
+       result.length = len;
+       resultptr = &result;
+#ifdef CONFIG_COMPAT
+       if (compat) {
+               compat_output(&result, &result32);
+               resultptr = &result32;
+       }
+#endif
+
+       if (copy_to_user(buf, resultptr, struct_len))
+               error = -EFAULT;
+       else
+               error = len;
+ out:
+       return error;
+}
+
+/* a read returns a single ast described in a struct dlm_lock_result */
+
+static ssize_t device_read(struct file *file, char __user *buf, size_t count,
+                          loff_t *ppos)
+{
+       struct dlm_user_proc *proc = file->private_data;
+       struct dlm_lkb *lkb;
+       struct dlm_user_args *ua;
+       DECLARE_WAITQUEUE(wait, current);
+       int error, type=0, bmode=0, removed = 0;
+
+#ifdef CONFIG_COMPAT
+       if (count < sizeof(struct dlm_lock_result32))
+#else
+       if (count < sizeof(struct dlm_lock_result))
+#endif
+               return -EINVAL;
+
+       /* do we really need this? can a read happen after a close? */
+       if (test_bit(DLM_PROC_FLAGS_CLOSING, &proc->flags))
+               return -EINVAL;
+
+       spin_lock(&proc->asts_spin);
+       if (list_empty(&proc->asts)) {
+               if (file->f_flags & O_NONBLOCK) {
+                       spin_unlock(&proc->asts_spin);
+                       return -EAGAIN;
+               }
+
+               add_wait_queue(&proc->wait, &wait);
+
+       repeat:
+               set_current_state(TASK_INTERRUPTIBLE);
+               if (list_empty(&proc->asts) && !signal_pending(current)) {
+                       spin_unlock(&proc->asts_spin);
+                       schedule();
+                       spin_lock(&proc->asts_spin);
+                       goto repeat;
+               }
+               set_current_state(TASK_RUNNING);
+               remove_wait_queue(&proc->wait, &wait);
+
+               if (signal_pending(current)) {
+                       spin_unlock(&proc->asts_spin);
+                       return -ERESTARTSYS;
+               }
+       }
+
+       if (list_empty(&proc->asts)) {
+               spin_unlock(&proc->asts_spin);
+               return -EAGAIN;
+       }
+
+       /* there may be both completion and blocking asts to return for
+          the lkb, don't remove lkb from asts list unless no asts remain */
+
+       lkb = list_entry(proc->asts.next, struct dlm_lkb, lkb_astqueue);
+
+       if (lkb->lkb_ast_type & AST_COMP) {
+               lkb->lkb_ast_type &= ~AST_COMP;
+               type = AST_COMP;
+       } else if (lkb->lkb_ast_type & AST_BAST) {
+               lkb->lkb_ast_type &= ~AST_BAST;
+               type = AST_BAST;
+               bmode = lkb->lkb_bastmode;
+       }
+
+       if (!lkb->lkb_ast_type) {
+               list_del(&lkb->lkb_astqueue);
+               removed = 1;
+       }
+       spin_unlock(&proc->asts_spin);
+
+       ua = (struct dlm_user_args *)lkb->lkb_astparam;
+       error = copy_result_to_user(ua,
+                               test_bit(DLM_PROC_FLAGS_COMPAT, &proc->flags),
+                               type, bmode, buf, count);
+
+       /* removes reference for the proc->asts lists added by
+          dlm_user_add_ast() and may result in the lkb being freed */
+       if (removed)
+               dlm_put_lkb(lkb);
+
+       return error;
+}
+
+static unsigned int device_poll(struct file *file, poll_table *wait)
+{
+       struct dlm_user_proc *proc = file->private_data;
+
+       poll_wait(file, &proc->wait, wait);
+
+       spin_lock(&proc->asts_spin);
+       if (!list_empty(&proc->asts)) {
+               spin_unlock(&proc->asts_spin);
+               return POLLIN | POLLRDNORM;
+       }
+       spin_unlock(&proc->asts_spin);
+       return 0;
+}
+
+static int ctl_device_open(struct inode *inode, struct file *file)
+{
+       file->private_data = NULL;
+       return 0;
+}
+
+static int ctl_device_close(struct inode *inode, struct file *file)
+{
+       return 0;
+}
+
+static struct file_operations device_fops = {
+       .open    = device_open,
+       .release = device_close,
+       .read    = device_read,
+       .write   = device_write,
+       .poll    = device_poll,
+       .owner   = THIS_MODULE,
+};
+
+static struct file_operations ctl_device_fops = {
+       .open    = ctl_device_open,
+       .release = ctl_device_close,
+       .write   = device_write,
+       .owner   = THIS_MODULE,
+};
+
+int dlm_user_init(void)
+{
+       int error;
+
+       ctl_device.name = "dlm-control";
+       ctl_device.fops = &ctl_device_fops;
+       ctl_device.minor = MISC_DYNAMIC_MINOR;
+
+       error = misc_register(&ctl_device);
+       if (error)
+               log_print("misc_register failed for control device");
+
+       return error;
+}
+
+void dlm_user_exit(void)
+{
+       misc_deregister(&ctl_device);
+}
+
 
--- /dev/null
+/*
+ * Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU General Public License v.2.
+ */
+
+#ifndef __USER_DOT_H__
+#define __USER_DOT_H__
+
+void dlm_user_add_ast(struct dlm_lkb *lkb, int type);
+int dlm_user_init(void);
+void dlm_user_exit(void);
+
+#endif