Fix a corruption that can happen when we have (two or more) outstanding
aio's to an overlapping unaligned region.  Ext4
(
e9e3bcecf44c04b9e6b505fd8e2eb9cea58fb94d) and xfs recently had to fix
similar issues.
In our case what happens is that we can have an outstanding aio on a region
and if a write comes in with some bytes overlapping the original aio we may
decide to read that region into a page before continuing (typically because
of buffered-io fallback).  Since we have no ordering guarantees with the
aio, we can read stale or bad data into the page and then write it back out.
If the i/o is page and block aligned, then we avoid this issue as there
won't be any need to read data from disk.
I took the same approach as Eric in the ext4 patch and introduced some
serialization of unaligned async direct i/o.  I don't expect this to have an
effect on the most common cases of AIO.  Unaligned aio will be slower
though, but that's far more acceptable than data corruption.
Signed-off-by: Mark Fasheh <mfasheh@suse.com>
Signed-off-by: Joel Becker <jlbec@evilplan.org>
 {
        struct inode *inode = iocb->ki_filp->f_path.dentry->d_inode;
        int level;
+       wait_queue_head_t *wq = ocfs2_ioend_wq(inode);
 
        /* this io's submitter should not have unlocked this before we could */
        BUG_ON(!ocfs2_iocb_is_rw_locked(iocb));
                ocfs2_iocb_clear_sem_locked(iocb);
        }
 
+       if (ocfs2_iocb_is_unaligned_aio(iocb)) {
+               ocfs2_iocb_clear_unaligned_aio(iocb);
+
+               if (atomic_dec_and_test(&OCFS2_I(inode)->ip_unaligned_aio) &&
+                   waitqueue_active(wq)) {
+                       wake_up_all(wq);
+               }
+       }
+
        ocfs2_iocb_clear_rw_locked(iocb);
 
        level = ocfs2_iocb_rw_locked_level(iocb);
 
        OCFS2_IOCB_RW_LOCK = 0,
        OCFS2_IOCB_RW_LOCK_LEVEL,
        OCFS2_IOCB_SEM,
+       OCFS2_IOCB_UNALIGNED_IO,
        OCFS2_IOCB_NUM_LOCKS
 };
 
        clear_bit(OCFS2_IOCB_SEM, (unsigned long *)&iocb->private)
 #define ocfs2_iocb_is_sem_locked(iocb) \
        test_bit(OCFS2_IOCB_SEM, (unsigned long *)&iocb->private)
+
+#define ocfs2_iocb_set_unaligned_aio(iocb) \
+       set_bit(OCFS2_IOCB_UNALIGNED_IO, (unsigned long *)&iocb->private)
+#define ocfs2_iocb_clear_unaligned_aio(iocb) \
+       clear_bit(OCFS2_IOCB_UNALIGNED_IO, (unsigned long *)&iocb->private)
+#define ocfs2_iocb_is_unaligned_aio(iocb) \
+       test_bit(OCFS2_IOCB_UNALIGNED_IO, (unsigned long *)&iocb->private)
+
+#define OCFS2_IOEND_WQ_HASH_SZ 37
+#define ocfs2_ioend_wq(v)   (&ocfs2__ioend_wq[((unsigned long)(v)) %\
+                                           OCFS2_IOEND_WQ_HASH_SZ])
+extern wait_queue_head_t ocfs2__ioend_wq[OCFS2_IOEND_WQ_HASH_SZ];
+
 #endif /* OCFS2_FILE_H */
 
        return ret;
 }
 
+static void ocfs2_aiodio_wait(struct inode *inode)
+{
+       wait_queue_head_t *wq = ocfs2_ioend_wq(inode);
+
+       wait_event(*wq, (atomic_read(&OCFS2_I(inode)->ip_unaligned_aio) == 0));
+}
+
+static int ocfs2_is_io_unaligned(struct inode *inode, size_t count, loff_t pos)
+{
+       int blockmask = inode->i_sb->s_blocksize - 1;
+       loff_t final_size = pos + count;
+
+       if ((pos & blockmask) || (final_size & blockmask))
+               return 1;
+       return 0;
+}
+
 static int ocfs2_prepare_inode_for_refcount(struct inode *inode,
                                            struct file *file,
                                            loff_t pos, size_t count,
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
        int full_coherency = !(osb->s_mount_opt &
                               OCFS2_MOUNT_COHERENCY_BUFFERED);
+       int unaligned_dio = 0;
 
        trace_ocfs2_file_aio_write(inode, file, file->f_path.dentry,
                (unsigned long long)OCFS2_I(inode)->ip_blkno,
                goto out;
        }
 
+       if (direct_io && !is_sync_kiocb(iocb))
+               unaligned_dio = ocfs2_is_io_unaligned(inode, iocb->ki_left,
+                                                     *ppos);
+
        /*
         * We can't complete the direct I/O as requested, fall back to
         * buffered I/O.
                goto relock;
        }
 
+       if (unaligned_dio) {
+               /*
+                * Wait on previous unaligned aio to complete before
+                * proceeding.
+                */
+               ocfs2_aiodio_wait(inode);
+
+               /* Mark the iocb as needing a decrement in ocfs2_dio_end_io */
+               atomic_inc(&OCFS2_I(inode)->ip_unaligned_aio);
+               ocfs2_iocb_set_unaligned_aio(iocb);
+       }
+
        /*
         * To later detect whether a journal commit for sync writes is
         * necessary, we sample i_size, and cluster count here.
        if ((ret == -EIOCBQUEUED) || (!ocfs2_iocb_is_rw_locked(iocb))) {
                rw_level = -1;
                have_alloc_sem = 0;
+               unaligned_dio = 0;
        }
 
+       if (unaligned_dio)
+               atomic_dec(&OCFS2_I(inode)->ip_unaligned_aio);
+
 out:
        if (rw_level != -1)
                ocfs2_rw_unlock(inode, rw_level);
 
        /* protects extended attribute changes on this inode */
        struct rw_semaphore             ip_xattr_sem;
 
+       /* Number of outstanding AIO's which are not page aligned */
+       atomic_t                        ip_unaligned_aio;
+
        /* These fields are protected by ip_lock */
        spinlock_t                      ip_lock;
        u32                             ip_open_count;
 
 #include "ocfs1_fs_compat.h"
 
 #include "alloc.h"
+#include "aops.h"
 #include "blockcheck.h"
 #include "dlmglue.h"
 #include "export.h"
        return 0;
 }
 
+wait_queue_head_t ocfs2__ioend_wq[OCFS2_IOEND_WQ_HASH_SZ];
+
 static int __init ocfs2_init(void)
 {
-       int status;
+       int status, i;
 
        ocfs2_print_version();
 
+       for (i = 0; i < OCFS2_IOEND_WQ_HASH_SZ; i++)
+               init_waitqueue_head(&ocfs2__ioend_wq[i]);
+
        status = init_ocfs2_uptodate_cache();
        if (status < 0) {
                mlog_errno(status);
        ocfs2_extent_map_init(&oi->vfs_inode);
        INIT_LIST_HEAD(&oi->ip_io_markers);
        oi->ip_dir_start_lookup = 0;
-
+       atomic_set(&oi->ip_unaligned_aio, 0);
        init_rwsem(&oi->ip_alloc_sem);
        init_rwsem(&oi->ip_xattr_sem);
        mutex_init(&oi->ip_io_mutex);