#include "iostat.h"
 
 #define NFSDBG_FACILITY                NFSDBG_VFS
-#define MAX_DIRECTIO_SIZE      (4096UL << PAGE_SHIFT)
 
 static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty);
 static kmem_cache_t *nfs_direct_cachep;
  */
 struct nfs_direct_req {
        struct kref             kref;           /* release manager */
+
+       /* I/O parameters */
        struct list_head        list;           /* nfs_read/write_data structs */
        struct file *           filp;           /* file descriptor */
        struct kiocb *          iocb;           /* controlling i/o request */
        struct inode *          inode;          /* target file of i/o */
        struct page **          pages;          /* pages in our buffer */
        unsigned int            npages;         /* count of pages */
-       atomic_t                complete,       /* i/os we're waiting for */
-                               count,          /* bytes actually processed */
+
+       /* completion state */
+       spinlock_t              lock;           /* protect completion state */
+       int                     outstanding;    /* i/os we're waiting for */
+       ssize_t                 count,          /* bytes actually processed */
                                error;          /* any reported error */
 };
 
-
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @rw: direction (read or write)
        unsigned long page_count;
        size_t array_size;
 
-       /* set an arbitrary limit to prevent type overflow */
-       if (size > MAX_DIRECTIO_SIZE) {
-               *pages = NULL;
-               return -EFBIG;
-       }
-
        page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
        page_count -= user_addr >> PAGE_SHIFT;
 
        init_waitqueue_head(&dreq->wait);
        INIT_LIST_HEAD(&dreq->list);
        dreq->iocb = NULL;
-       atomic_set(&dreq->count, 0);
-       atomic_set(&dreq->error, 0);
+       spin_lock_init(&dreq->lock);
+       dreq->outstanding = 0;
+       dreq->count = 0;
+       dreq->error = 0;
 
        return dreq;
 }
  */
 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
 {
-       int result = -EIOCBQUEUED;
+       ssize_t result = -EIOCBQUEUED;
 
        /* Async requests don't wait here */
        if (dreq->iocb)
                goto out;
 
-       result = wait_event_interruptible(dreq->wait,
-                                       (atomic_read(&dreq->complete) == 0));
+       result = wait_event_interruptible(dreq->wait, (dreq->outstanding == 0));
 
        if (!result)
-               result = atomic_read(&dreq->error);
+               result = dreq->error;
        if (!result)
-               result = atomic_read(&dreq->count);
+               result = dreq->count;
 
 out:
        kref_put(&dreq->kref, nfs_direct_req_release);
        nfs_free_user_pages(dreq->pages, dreq->npages, 1);
 
        if (dreq->iocb) {
-               long res = atomic_read(&dreq->error);
+               long res = (long) dreq->error;
                if (!res)
-                       res = atomic_read(&dreq->count);
+                       res = (long) dreq->count;
                aio_complete(dreq->iocb, res, 0);
        } else
                wake_up(&dreq->wait);
 {
        struct list_head *list;
        struct nfs_direct_req *dreq;
-       unsigned int reads = 0;
        unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
        dreq = nfs_direct_req_alloc();
                list_add(&data->pages, list);
 
                data->req = (struct nfs_page *) dreq;
-               reads++;
+               dreq->outstanding++;
                if (nbytes <= rsize)
                        break;
                nbytes -= rsize;
        }
        kref_get(&dreq->kref);
-       atomic_set(&dreq->complete, reads);
        return dreq;
 }
 
 
        if (nfs_readpage_result(task, data) != 0)
                return;
+
+       spin_lock(&dreq->lock);
+
        if (likely(task->tk_status >= 0))
-               atomic_add(data->res.count, &dreq->count);
+               dreq->count += data->res.count;
        else
-               atomic_set(&dreq->error, task->tk_status);
+               dreq->error = task->tk_status;
+
+       if (--dreq->outstanding) {
+               spin_unlock(&dreq->lock);
+               return;
+       }
 
-       if (unlikely(atomic_dec_and_test(&dreq->complete)))
-               nfs_direct_complete(dreq);
+       spin_unlock(&dreq->lock);
+       nfs_direct_complete(dreq);
 }
 
 static const struct rpc_call_ops nfs_read_direct_ops = {
 {
        struct list_head *list;
        struct nfs_direct_req *dreq;
-       unsigned int writes = 0;
        unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
        dreq = nfs_direct_req_alloc();
                list_add(&data->pages, list);
 
                data->req = (struct nfs_page *) dreq;
-               writes++;
+               dreq->outstanding++;
                if (nbytes <= wsize)
                        break;
                nbytes -= wsize;
        }
        kref_get(&dreq->kref);
-       atomic_set(&dreq->complete, writes);
        return dreq;
 }
 
+/*
+ * NB: Return the value of the first error return code.  Subsequent
+ *     errors after the first one are ignored.
+ */
 static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
 {
        struct nfs_write_data *data = calldata;
        if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
                status = -EIO;
 
+       spin_lock(&dreq->lock);
+
        if (likely(status >= 0))
-               atomic_add(data->res.count, &dreq->count);
+               dreq->count += data->res.count;
        else
-               atomic_set(&dreq->error, status);
+               dreq->error = status;
 
-       if (unlikely(atomic_dec_and_test(&dreq->complete))) {
-               nfs_end_data_update(data->inode);
-               nfs_direct_complete(dreq);
+       if (--dreq->outstanding) {
+               spin_unlock(&dreq->lock);
+               return;
        }
+
+       spin_unlock(&dreq->lock);
+
+       nfs_end_data_update(data->inode);
+       nfs_direct_complete(dreq);
 }
 
 static const struct rpc_call_ops nfs_write_direct_ops = {