{
        struct file *file = iocb->ki_filp;
        struct gfs2_inode *ip = GFS2_I(file->f_mapping->host);
-       size_t count = iov_iter_count(to);
+       size_t prev_count = 0, window_size = 0;
+       size_t written = 0;
        ssize_t ret;
 
-       if (!count)
+       /*
+        * In this function, we disable page faults when we're holding the
+        * inode glock while doing I/O.  If a page fault occurs, we indicate
+        * that the inode glock may be dropped, fault in the pages manually,
+        * and retry.
+        *
+        * Unlike generic_file_read_iter, for reads, iomap_dio_rw can trigger
+        * physical as well as manual page faults, and we need to disable both
+        * kinds.
+        *
+        * For direct I/O, gfs2 takes the inode glock in deferred mode.  This
+        * locking mode is compatible with other deferred holders, so multiple
+        * processes and nodes can do direct I/O to a file at the same time.
+        * There's no guarantee that reads or writes will be atomic.  Any
+        * coordination among readers and writers needs to happen externally.
+        */
+
+       if (!iov_iter_count(to))
                return 0; /* skip atime */
 
        gfs2_holder_init(ip->i_gl, LM_ST_DEFERRED, 0, gh);
+retry:
        ret = gfs2_glock_nq(gh);
        if (ret)
                goto out_uninit;
+retry_under_glock:
+       pagefault_disable();
+       to->nofault = true;
+       ret = iomap_dio_rw(iocb, to, &gfs2_iomap_ops, NULL,
+                          IOMAP_DIO_PARTIAL, written);
+       to->nofault = false;
+       pagefault_enable();
+       if (ret > 0)
+               written = ret;
 
-       ret = iomap_dio_rw(iocb, to, &gfs2_iomap_ops, NULL, 0, 0);
-       gfs2_glock_dq(gh);
+       if (should_fault_in_pages(ret, to, &prev_count, &window_size)) {
+               size_t leftover;
+
+               gfs2_holder_allow_demote(gh);
+               leftover = fault_in_iov_iter_writeable(to, window_size);
+               gfs2_holder_disallow_demote(gh);
+               if (leftover != window_size) {
+                       if (!gfs2_holder_queued(gh))
+                               goto retry;
+                       goto retry_under_glock;
+               }
+       }
+       if (gfs2_holder_queued(gh))
+               gfs2_glock_dq(gh);
 out_uninit:
        gfs2_holder_uninit(gh);
-       return ret;
+       if (ret < 0)
+               return ret;
+       return written;
 }
 
 static ssize_t gfs2_file_direct_write(struct kiocb *iocb, struct iov_iter *from,
        struct file *file = iocb->ki_filp;
        struct inode *inode = file->f_mapping->host;
        struct gfs2_inode *ip = GFS2_I(inode);
-       size_t len = iov_iter_count(from);
-       loff_t offset = iocb->ki_pos;
+       size_t prev_count = 0, window_size = 0;
+       size_t read = 0;
        ssize_t ret;
 
+       /*
+        * In this function, we disable page faults when we're holding the
+        * inode glock while doing I/O.  If a page fault occurs, we indicate
+        * that the inode glock may be dropped, fault in the pages manually,
+        * and retry.
+        *
+        * For writes, iomap_dio_rw only triggers manual page faults, so we
+        * don't need to disable physical ones.
+        */
+
        /*
         * Deferred lock, even if its a write, since we do no allocation on
         * this path. All we need to change is the atime, and this lock mode
         * VFS does.
         */
        gfs2_holder_init(ip->i_gl, LM_ST_DEFERRED, 0, gh);
+retry:
        ret = gfs2_glock_nq(gh);
        if (ret)
                goto out_uninit;
-
+retry_under_glock:
        /* Silently fall back to buffered I/O when writing beyond EOF */
-       if (offset + len > i_size_read(&ip->i_inode))
+       if (iocb->ki_pos + iov_iter_count(from) > i_size_read(&ip->i_inode))
                goto out;
 
-       ret = iomap_dio_rw(iocb, from, &gfs2_iomap_ops, NULL, 0, 0);
+       from->nofault = true;
+       ret = iomap_dio_rw(iocb, from, &gfs2_iomap_ops, NULL,
+                          IOMAP_DIO_PARTIAL, read);
+       from->nofault = false;
+
        if (ret == -ENOTBLK)
                ret = 0;
+       if (ret > 0)
+               read = ret;
+
+       if (should_fault_in_pages(ret, from, &prev_count, &window_size)) {
+               size_t leftover;
+
+               gfs2_holder_allow_demote(gh);
+               leftover = fault_in_iov_iter_readable(from, window_size);
+               gfs2_holder_disallow_demote(gh);
+               if (leftover != window_size) {
+                       if (!gfs2_holder_queued(gh))
+                               goto retry;
+                       goto retry_under_glock;
+               }
+       }
 out:
-       gfs2_glock_dq(gh);
+       if (gfs2_holder_queued(gh))
+               gfs2_glock_dq(gh);
 out_uninit:
        gfs2_holder_uninit(gh);
-       return ret;
+       if (ret < 0)
+               return ret;
+       return read;
 }
 
 static ssize_t gfs2_file_read_iter(struct kiocb *iocb, struct iov_iter *to)