unsigned int    nr_bvecs;
 };
 
+struct async_list {
+       spinlock_t              lock;
+       atomic_t                cnt;
+       struct list_head        list;
+
+       struct file             *file;
+       off_t                   io_end;
+       size_t                  io_pages;
+};
+
 struct io_ring_ctx {
        struct {
                struct percpu_ref       refs;
                struct list_head        cancel_list;
        } ____cacheline_aligned_in_smp;
 
+       struct async_list       pending_async[2];
+
 #if defined(CONFIG_UNIX)
        struct socket           *ring_sock;
 #endif
 #define REQ_F_FORCE_NONBLOCK   1       /* inline submission attempt */
 #define REQ_F_IOPOLL_COMPLETED 2       /* polled IO has completed */
 #define REQ_F_FIXED_FILE       4       /* ctx owns file */
+#define REQ_F_SEQ_PREV         8       /* sequential with previous */
        u64                     user_data;
        u64                     error;
 
 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 {
        struct io_ring_ctx *ctx;
+       int i;
 
        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
        if (!ctx)
        init_completion(&ctx->ctx_done);
        mutex_init(&ctx->uring_lock);
        init_waitqueue_head(&ctx->wait);
+       for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
+               spin_lock_init(&ctx->pending_async[i].lock);
+               INIT_LIST_HEAD(&ctx->pending_async[i].list);
+               atomic_set(&ctx->pending_async[i].cnt, 0);
+       }
        spin_lock_init(&ctx->completion_lock);
        INIT_LIST_HEAD(&ctx->poll_list);
        INIT_LIST_HEAD(&ctx->cancel_list);
        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 }
 
+/*
+ * Make a note of the last file/offset/direction we punted to async
+ * context. We'll use this information to see if we can piggy back a
+ * sequential request onto the previous one, if it's still hasn't been
+ * completed by the async worker.
+ */
+static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
+{
+       struct async_list *async_list = &req->ctx->pending_async[rw];
+       struct kiocb *kiocb = &req->rw;
+       struct file *filp = kiocb->ki_filp;
+       off_t io_end = kiocb->ki_pos + len;
+
+       if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+               unsigned long max_pages;
+
+               /* Use 8x RA size as a decent limiter for both reads/writes */
+               max_pages = filp->f_ra.ra_pages;
+               if (!max_pages)
+                       max_pages = VM_MAX_READAHEAD >> (PAGE_SHIFT - 10);
+               max_pages *= 8;
+
+               /* If max pages are exceeded, reset the state */
+               len >>= PAGE_SHIFT;
+               if (async_list->io_pages + len <= max_pages) {
+                       req->flags |= REQ_F_SEQ_PREV;
+                       async_list->io_pages += len;
+               } else {
+                       io_end = 0;
+                       async_list->io_pages = 0;
+               }
+       }
+
+       /* New file? Reset state. */
+       if (async_list->file != filp) {
+               async_list->io_pages = 0;
+               async_list->file = filp;
+       }
+       async_list->io_end = io_end;
+}
+
 static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
                       bool force_nonblock, struct io_submit_state *state)
 {
        struct kiocb *kiocb = &req->rw;
        struct iov_iter iter;
        struct file *file;
+       size_t iov_count;
        ssize_t ret;
 
        ret = io_prep_rw(req, s, force_nonblock, state);
        if (ret)
                goto out_fput;
 
-       ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_iter_count(&iter));
+       iov_count = iov_iter_count(&iter);
+       ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
        if (!ret) {
                ssize_t ret2;
 
                /* Catch -EAGAIN return for forced non-blocking submission */
                ret2 = call_read_iter(file, kiocb, &iter);
-               if (!force_nonblock || ret2 != -EAGAIN)
+               if (!force_nonblock || ret2 != -EAGAIN) {
                        io_rw_done(kiocb, ret2);
-               else
+               } else {
+                       /*
+                        * If ->needs_lock is true, we're already in async
+                        * context.
+                        */
+                       if (!s->needs_lock)
+                               io_async_list_note(READ, req, iov_count);
                        ret = -EAGAIN;
+               }
        }
        kfree(iovec);
 out_fput:
        struct kiocb *kiocb = &req->rw;
        struct iov_iter iter;
        struct file *file;
+       size_t iov_count;
        ssize_t ret;
 
        ret = io_prep_rw(req, s, force_nonblock, state);
        if (ret)
                return ret;
-       /* Hold on to the file for -EAGAIN */
-       if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
-               return -EAGAIN;
 
        ret = -EBADF;
        file = kiocb->ki_filp;
        if (ret)
                goto out_fput;
 
-       ret = rw_verify_area(WRITE, file, &kiocb->ki_pos,
-                               iov_iter_count(&iter));
+       iov_count = iov_iter_count(&iter);
+
+       ret = -EAGAIN;
+       if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
+               /* If ->needs_lock is true, we're already in async context. */
+               if (!s->needs_lock)
+                       io_async_list_note(WRITE, req, iov_count);
+               goto out_free;
+       }
+
+       ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
        if (!ret) {
                /*
                 * Open-code file_start_write here to grab freeze protection,
                kiocb->ki_flags |= IOCB_WRITE;
                io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
        }
+out_free:
        kfree(iovec);
 out_fput:
-       if (unlikely(ret))
+       /* Hold on to the file for -EAGAIN */
+       if (unlikely(ret && ret != -EAGAIN))
                io_fput(req);
        return ret;
 }
        return 0;
 }
 
+static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
+                                                const struct io_uring_sqe *sqe)
+{
+       switch (sqe->opcode) {
+       case IORING_OP_READV:
+       case IORING_OP_READ_FIXED:
+               return &ctx->pending_async[READ];
+       case IORING_OP_WRITEV:
+       case IORING_OP_WRITE_FIXED:
+               return &ctx->pending_async[WRITE];
+       default:
+               return NULL;
+       }
+}
+
 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
 {
        u8 opcode = READ_ONCE(sqe->opcode);
 static void io_sq_wq_submit_work(struct work_struct *work)
 {
        struct io_kiocb *req = container_of(work, struct io_kiocb, work);
-       struct sqe_submit *s = &req->submit;
-       const struct io_uring_sqe *sqe = s->sqe;
        struct io_ring_ctx *ctx = req->ctx;
+       struct mm_struct *cur_mm = NULL;
+       struct async_list *async_list;
+       LIST_HEAD(req_list);
        mm_segment_t old_fs;
-       bool needs_user;
        int ret;
 
-        /* Ensure we clear previously set forced non-block flag */
-       req->flags &= ~REQ_F_FORCE_NONBLOCK;
-       req->rw.ki_flags &= ~IOCB_NOWAIT;
+       async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
+restart:
+       do {
+               struct sqe_submit *s = &req->submit;
+               const struct io_uring_sqe *sqe = s->sqe;
+
+               /* Ensure we clear previously set forced non-block flag */
+               req->flags &= ~REQ_F_FORCE_NONBLOCK;
+               req->rw.ki_flags &= ~IOCB_NOWAIT;
+
+               ret = 0;
+               if (io_sqe_needs_user(sqe) && !cur_mm) {
+                       if (!mmget_not_zero(ctx->sqo_mm)) {
+                               ret = -EFAULT;
+                       } else {
+                               cur_mm = ctx->sqo_mm;
+                               use_mm(cur_mm);
+                               old_fs = get_fs();
+                               set_fs(USER_DS);
+                       }
+               }
+
+               if (!ret) {
+                       s->has_user = cur_mm != NULL;
+                       s->needs_lock = true;
+                       do {
+                               ret = __io_submit_sqe(ctx, req, s, false, NULL);
+                               /*
+                                * We can get EAGAIN for polled IO even though
+                                * we're forcing a sync submission from here,
+                                * since we can't wait for request slots on the
+                                * block side.
+                                */
+                               if (ret != -EAGAIN)
+                                       break;
+                               cond_resched();
+                       } while (1);
+               }
+               if (ret) {
+                       io_cqring_add_event(ctx, sqe->user_data, ret, 0);
+                       io_free_req(req);
+               }
 
-       s->needs_lock = true;
-       s->has_user = false;
+               /* async context always use a copy of the sqe */
+               kfree(sqe);
+
+               if (!async_list)
+                       break;
+               if (!list_empty(&req_list)) {
+                       req = list_first_entry(&req_list, struct io_kiocb,
+                                               list);
+                       list_del(&req->list);
+                       continue;
+               }
+               if (list_empty(&async_list->list))
+                       break;
+
+               req = NULL;
+               spin_lock(&async_list->lock);
+               if (list_empty(&async_list->list)) {
+                       spin_unlock(&async_list->lock);
+                       break;
+               }
+               list_splice_init(&async_list->list, &req_list);
+               spin_unlock(&async_list->lock);
+
+               req = list_first_entry(&req_list, struct io_kiocb, list);
+               list_del(&req->list);
+       } while (req);
 
        /*
-        * If we're doing IO to fixed buffers, we don't need to get/set
-        * user context
+        * Rare case of racing with a submitter. If we find the count has
+        * dropped to zero AND we have pending work items, then restart
+        * the processing. This is a tiny race window.
         */
-       needs_user = io_sqe_needs_user(s->sqe);
-       if (needs_user) {
-               if (!mmget_not_zero(ctx->sqo_mm)) {
-                       ret = -EFAULT;
-                       goto err;
+       if (async_list) {
+               ret = atomic_dec_return(&async_list->cnt);
+               while (!ret && !list_empty(&async_list->list)) {
+                       spin_lock(&async_list->lock);
+                       atomic_inc(&async_list->cnt);
+                       list_splice_init(&async_list->list, &req_list);
+                       spin_unlock(&async_list->lock);
+
+                       if (!list_empty(&req_list)) {
+                               req = list_first_entry(&req_list,
+                                                       struct io_kiocb, list);
+                               list_del(&req->list);
+                               goto restart;
+                       }
+                       ret = atomic_dec_return(&async_list->cnt);
                }
-               use_mm(ctx->sqo_mm);
-               old_fs = get_fs();
-               set_fs(USER_DS);
-               s->has_user = true;
        }
 
-       do {
-               ret = __io_submit_sqe(ctx, req, s, false, NULL);
-               /*
-                * We can get EAGAIN for polled IO even though we're forcing
-                * a sync submission from here, since we can't wait for
-                * request slots on the block side.
-                */
-               if (ret != -EAGAIN)
-                       break;
-               cond_resched();
-       } while (1);
-
-       if (needs_user) {
+       if (cur_mm) {
                set_fs(old_fs);
-               unuse_mm(ctx->sqo_mm);
-               mmput(ctx->sqo_mm);
-       }
-err:
-       if (ret) {
-               io_cqring_add_event(ctx, sqe->user_data, ret, 0);
-               io_free_req(req);
+               unuse_mm(cur_mm);
+               mmput(cur_mm);
        }
+}
 
-       /* async context always use a copy of the sqe */
-       kfree(sqe);
+/*
+ * See if we can piggy back onto previously submitted work, that is still
+ * running. We currently only allow this if the new request is sequential
+ * to the previous one we punted.
+ */
+static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
+{
+       bool ret = false;
+
+       if (!list)
+               return false;
+       if (!(req->flags & REQ_F_SEQ_PREV))
+               return false;
+       if (!atomic_read(&list->cnt))
+               return false;
+
+       ret = true;
+       spin_lock(&list->lock);
+       list_add_tail(&req->list, &list->list);
+       if (!atomic_read(&list->cnt)) {
+               list_del_init(&req->list);
+               ret = false;
+       }
+       spin_unlock(&list->lock);
+       return ret;
 }
 
 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
 
                sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
                if (sqe_copy) {
+                       struct async_list *list;
+
                        memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
                        s->sqe = sqe_copy;
 
                        memcpy(&req->submit, s, sizeof(*s));
-                       INIT_WORK(&req->work, io_sq_wq_submit_work);
-                       queue_work(ctx->sqo_wq, &req->work);
+                       list = io_async_list_from_sqe(ctx, s->sqe);
+                       if (!io_add_to_prev_work(list, req)) {
+                               if (list)
+                                       atomic_inc(&list->cnt);
+                               INIT_WORK(&req->work, io_sq_wq_submit_work);
+                               queue_work(ctx->sqo_wq, &req->work);
+                       }
                        ret = 0;
                }
        }