*/
        int (*cpo_is_under_lock)(const struct lu_env *env,
                                 const struct cl_page_slice *slice,
-                                struct cl_io *io);
+                                struct cl_io *io, pgoff_t *max);
 
        /**
         * Optional debugging helper. Prints given page slice.
 }
 
 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
-                      struct cl_object *obj,
+                      struct cl_object *obj, pgoff_t index,
                       const struct cl_page_operations *ops);
 void cl_lock_slice_add(struct cl_lock *lock, struct cl_lock_slice *slice,
                       struct cl_object *obj,
 int cl_page_is_vmlocked(const struct lu_env *env, const struct cl_page *pg);
 void cl_page_export(const struct lu_env *env, struct cl_page *pg, int uptodate);
 int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page);
+                         struct cl_page *page, pgoff_t *max_index);
 loff_t cl_offset(const struct cl_object *obj, pgoff_t idx);
 pgoff_t cl_index(const struct cl_object *obj, loff_t offset);
 int cl_page_size(const struct cl_object *obj);
 
                  const struct cl_lock_operations *lkops);
 int ccc_object_glimpse(const struct lu_env *env,
                       const struct cl_object *obj, struct ost_lvb *lvb);
-int ccc_page_is_under_lock(const struct lu_env *env,
-                          const struct cl_page_slice *slice, struct cl_io *io);
 int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice);
 int ccc_transient_page_prep(const struct lu_env *env,
                            const struct cl_page_slice *slice,
 
  *
  */
 
-int ccc_page_is_under_lock(const struct lu_env *env,
-                          const struct cl_page_slice *slice,
-                          struct cl_io *io)
-{
-       struct ccc_io   *cio  = ccc_env_io(env);
-       struct cl_lock_descr *desc = &ccc_env_info(env)->cti_descr;
-       struct cl_page       *page = slice->cpl_page;
-
-       int result;
-
-       if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
-           io->ci_type == CIT_FAULT) {
-               if (cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED) {
-                       result = -EBUSY;
-               } else {
-                       desc->cld_start = ccc_index(cl2ccc_page(slice));
-                       desc->cld_end   = ccc_index(cl2ccc_page(slice));
-                       desc->cld_obj   = page->cp_obj;
-                       desc->cld_mode  = CLM_READ;
-                       result = cl_queue_match(&io->ci_lockset.cls_done,
-                                               desc) ? -EBUSY : 0;
-               }
-       } else {
-               result = 0;
-       }
-       return result;
-}
-
 int ccc_fail(const struct lu_env *env, const struct cl_page_slice *slice)
 {
        /*
 
        RA_STAT_EOF,
        RA_STAT_MAX_IN_FLIGHT,
        RA_STAT_WRONG_GRAB_PAGE,
+       RA_STAT_FAILED_REACH_END,
        _NR_RA_STAT,
 };
 
 int ll_readpage(struct file *file, struct page *page);
 void ll_readahead_init(struct inode *inode, struct ll_readahead_state *ras);
 int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct ll_readahead_state *ras, struct address_space *mapping,
-                struct cl_page_list *queue, int flags);
+                struct cl_page_list *queue, struct ll_readahead_state *ras,
+                bool hit);
 int vvp_io_write_commit(const struct lu_env *env, struct cl_io *io);
 struct ll_cl_context *ll_cl_init(struct file *file, struct page *vmpage);
 void ll_cl_fini(struct ll_cl_context *lcc);
                struct ll_readahead_state *ras, unsigned long index,
                unsigned hit);
 void ll_ra_count_put(struct ll_sb_info *sbi, unsigned long len);
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which);
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which);
 
 /* llite/llite_rmtacl.c */
 #ifdef CONFIG_FS_POSIX_ACL
 
        [RA_STAT_EOF] = "read-ahead to EOF",
        [RA_STAT_MAX_IN_FLIGHT] = "hit max r-a issue",
        [RA_STAT_WRONG_GRAB_PAGE] = "wrong page from grab_cache_page",
+       [RA_STAT_FAILED_REACH_END] = "failed to reach end"
 };
 
 int ldebugfs_register_mountpoint(struct dentry *parent,
 
  */
 static unsigned long ll_ra_count_get(struct ll_sb_info *sbi,
                                     struct ra_io_arg *ria,
-                                    unsigned long pages)
+                                    unsigned long pages, unsigned long min)
 {
        struct ll_ra_info *ra = &sbi->ll_ra_info;
        long ret;
        }
 
 out:
+       if (ret < min) {
+               /* override ra limit for maximum performance */
+               atomic_add(min - ret, &ra->ra_cur_pages);
+               ret = min;
+       }
        return ret;
 }
 
        lprocfs_counter_incr(sbi->ll_ra_stats, which);
 }
 
-void ll_ra_stats_inc(struct address_space *mapping, enum ra_stat which)
+void ll_ra_stats_inc(struct inode *inode, enum ra_stat which)
 {
-       struct ll_sb_info *sbi = ll_i2sbi(mapping->host);
+       struct ll_sb_info *sbi = ll_i2sbi(inode);
 
        ll_ra_stats_inc_sbi(sbi, which);
 }
 
 static int cl_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                              struct cl_page_list *queue, struct cl_page *page,
-                             struct cl_object *clob)
+                             struct cl_object *clob, pgoff_t *max_index)
 {
        struct page *vmpage = page->cp_vmpage;
        struct ccc_page *cp;
        lu_ref_add(&page->cp_reference, "ra", current);
        cp = cl2ccc_page(cl_object_page_slice(clob, page));
        if (!cp->cpg_defer_uptodate && !PageUptodate(vmpage)) {
-               rc = cl_page_is_under_lock(env, io, page);
-               if (rc == -EBUSY) {
+               CDEBUG(D_READA, "page index %lu, max_index: %lu\n",
+                      ccc_index(cp), *max_index);
+               if (*max_index == 0 || ccc_index(cp) > *max_index)
+                       rc = cl_page_is_under_lock(env, io, page, max_index);
+               if (rc == 0) {
                        cp->cpg_defer_uptodate = 1;
                        cp->cpg_ra_used = 0;
                        cl_page_list_add(queue, page);
  */
 static int ll_read_ahead_page(const struct lu_env *env, struct cl_io *io,
                              struct cl_page_list *queue,
-                             pgoff_t index, struct address_space *mapping)
+                             pgoff_t index, pgoff_t *max_index)
 {
+       struct cl_object *clob  = io->ci_obj;
+       struct inode     *inode = ccc_object_inode(clob);
        struct page      *vmpage;
-       struct cl_object *clob  = ll_i2info(mapping->host)->lli_clob;
        struct cl_page   *page;
        enum ra_stat      which = _NR_RA_STAT; /* keep gcc happy */
        int            rc    = 0;
        const char       *msg   = NULL;
 
-       vmpage = grab_cache_page_nowait(mapping, index);
+       vmpage = grab_cache_page_nowait(inode->i_mapping, index);
        if (vmpage) {
                /* Check if vmpage was truncated or reclaimed */
-               if (vmpage->mapping == mapping) {
+               if (vmpage->mapping == inode->i_mapping) {
                        page = cl_page_find(env, clob, vmpage->index,
                                            vmpage, CPT_CACHEABLE);
                        if (!IS_ERR(page)) {
                                rc = cl_read_ahead_page(env, io, queue,
-                                                       page, clob);
+                                                       page, clob, max_index);
                                if (rc == -ENOLCK) {
                                        which = RA_STAT_FAILED_MATCH;
                                        msg   = "lock match failed";
                msg   = "g_c_p_n failed";
        }
        if (msg) {
-               ll_ra_stats_inc(mapping, which);
+               ll_ra_stats_inc(inode, which);
                CDEBUG(D_READA, "%s\n", msg);
        }
        return rc;
                               struct cl_io *io, struct cl_page_list *queue,
                               struct ra_io_arg *ria,
                               unsigned long *reserved_pages,
-                              struct address_space *mapping,
                               unsigned long *ra_end)
 {
-       int rc, count = 0, stride_ria;
-       unsigned long page_idx;
+       int rc, count = 0;
+       bool stride_ria;
+       pgoff_t page_idx;
+       pgoff_t max_index = 0;
 
        LASSERT(ria);
        RIA_DEBUG(ria);
                if (ras_inside_ra_window(page_idx, ria)) {
                        /* If the page is inside the read-ahead window*/
                        rc = ll_read_ahead_page(env, io, queue,
-                                               page_idx, mapping);
+                                               page_idx, &max_index);
                        if (rc == 1) {
                                (*reserved_pages)--;
                                count++;
 }
 
 int ll_readahead(const struct lu_env *env, struct cl_io *io,
-                struct ll_readahead_state *ras, struct address_space *mapping,
-                struct cl_page_list *queue, int flags)
+                struct cl_page_list *queue, struct ll_readahead_state *ras,
+                bool hit)
 {
        struct vvp_io *vio = vvp_env_io(env);
        struct vvp_thread_info *vti = vvp_env_info(env);
        struct cl_attr *attr = ccc_env_thread_attr(env);
        unsigned long start = 0, end = 0, reserved;
-       unsigned long ra_end, len;
+       unsigned long ra_end, len, mlen = 0;
        struct inode *inode;
        struct ll_ra_read *bead;
        struct ra_io_arg *ria = &vti->vti_ria;
-       struct ll_inode_info *lli;
        struct cl_object *clob;
        int ret = 0;
        __u64 kms;
 
-       inode = mapping->host;
-       lli = ll_i2info(inode);
-       clob = lli->lli_clob;
+       clob = io->ci_obj;
+       inode = ccc_object_inode(clob);
 
        memset(ria, 0, sizeof(*ria));
 
                return ret;
        kms = attr->cat_kms;
        if (kms == 0) {
-               ll_ra_stats_inc(mapping, RA_STAT_ZERO_LEN);
+               ll_ra_stats_inc(inode, RA_STAT_ZERO_LEN);
                return 0;
        }
 
        spin_unlock(&ras->ras_lock);
 
        if (end == 0) {
-               ll_ra_stats_inc(mapping, RA_STAT_ZERO_WINDOW);
+               ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
                return 0;
        }
        len = ria_page_count(ria);
-       if (len == 0)
+       if (len == 0) {
+               ll_ra_stats_inc(inode, RA_STAT_ZERO_WINDOW);
                return 0;
+       }
+
+       CDEBUG(D_READA, DFID ": ria: %lu/%lu, bead: %lu/%lu, hit: %d\n",
+              PFID(lu_object_fid(&clob->co_lu)),
+              ria->ria_start, ria->ria_end,
+              !bead ? 0 : bead->lrr_start,
+              !bead ? 0 : bead->lrr_count,
+              hit);
+
+       /* at least to extend the readahead window to cover current read */
+       if (!hit && bead &&
+           bead->lrr_start + bead->lrr_count > ria->ria_start) {
+               /* to the end of current read window. */
+               mlen = bead->lrr_start + bead->lrr_count - ria->ria_start;
+               /* trim to RPC boundary */
+               start = ria->ria_start & (PTLRPC_MAX_BRW_PAGES - 1);
+               mlen = min(mlen, PTLRPC_MAX_BRW_PAGES - start);
+       }
 
-       reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len);
+       reserved = ll_ra_count_get(ll_i2sbi(inode), ria, len, mlen);
        if (reserved < len)
-               ll_ra_stats_inc(mapping, RA_STAT_MAX_IN_FLIGHT);
+               ll_ra_stats_inc(inode, RA_STAT_MAX_IN_FLIGHT);
 
-       CDEBUG(D_READA, "reserved page %lu ra_cur %d ra_max %lu\n", reserved,
+       CDEBUG(D_READA, "reserved pages %lu/%lu/%lu, ra_cur %d, ra_max %lu\n",
+              reserved, len, mlen,
               atomic_read(&ll_i2sbi(inode)->ll_ra_info.ra_cur_pages),
               ll_i2sbi(inode)->ll_ra_info.ra_max_pages);
 
-       ret = ll_read_ahead_pages(env, io, queue,
-                                 ria, &reserved, mapping, &ra_end);
+       ret = ll_read_ahead_pages(env, io, queue, ria, &reserved, &ra_end);
 
        if (reserved != 0)
                ll_ra_count_put(ll_i2sbi(inode), reserved);
 
        if (ra_end == end + 1 && ra_end == (kms >> PAGE_CACHE_SHIFT))
-               ll_ra_stats_inc(mapping, RA_STAT_EOF);
+               ll_ra_stats_inc(inode, RA_STAT_EOF);
 
        /* if we didn't get to the end of the region we reserved from
         * the ras we need to go back and update the ras so that the
               ra_end, end, ria->ria_end);
 
        if (ra_end != end + 1) {
+               ll_ra_stats_inc(inode, RA_STAT_FAILED_REACH_END);
                spin_lock(&ras->ras_lock);
                if (ra_end < ras->ras_next_readahead &&
                    index_in_window(ra_end, ras->ras_window_start, 0,
        ras->ras_last_readpage = index;
        ras_set_start(inode, ras, index);
 
-       if (stride_io_mode(ras))
+       if (stride_io_mode(ras)) {
                /* Since stride readahead is sensitive to the offset
                 * of read-ahead, so we use original offset here,
                 * instead of ras_window_start, which is RPC aligned
                 */
                ras->ras_next_readahead = max(index, ras->ras_next_readahead);
-       else
-               ras->ras_next_readahead = max(ras->ras_window_start,
-                                             ras->ras_next_readahead);
+       } else {
+               if (ras->ras_next_readahead < ras->ras_window_start)
+                       ras->ras_next_readahead = ras->ras_window_start;
+               if (!hit)
+                       ras->ras_next_readahead = index + 1;
+       }
        RAS_CDEBUG(ras);
 
        /* Trigger RA in the mmap case where ras_consecutive_requests
 
                            const struct cl_page_slice *slice)
 {
        struct cl_io          *io     = ios->cis_io;
-       struct cl_object          *obj    = slice->cpl_obj;
        struct ccc_page    *cp     = cl2ccc_page(slice);
        struct cl_page      *page   = slice->cpl_page;
-       struct inode          *inode  = ccc_object_inode(obj);
+       struct inode          *inode  = ccc_object_inode(slice->cpl_obj);
        struct ll_sb_info        *sbi    = ll_i2sbi(inode);
        struct ll_file_data       *fd     = cl2ccc_io(env, ios)->cui_fd;
        struct ll_readahead_state *ras    = &fd->fd_ras;
-       struct page             *vmpage = cp->cpg_page;
        struct cl_2queue          *queue  = &io->ci_queue;
-       int rc;
-
-       CLOBINVRNT(env, obj, ccc_object_invariant(obj));
-       LASSERT(slice->cpl_obj == obj);
 
        if (sbi->ll_ra_info.ra_max_pages_per_file &&
            sbi->ll_ra_info.ra_max_pages)
                ras_update(sbi, inode, ras, ccc_index(cp),
                           cp->cpg_defer_uptodate);
 
-       /* Sanity check whether the page is protected by a lock. */
-       rc = cl_page_is_under_lock(env, io, page);
-       if (rc != -EBUSY) {
-               CL_PAGE_HEADER(D_WARNING, env, page, "%s: %d\n",
-                              rc == -ENODATA ? "without a lock" :
-                              "match failed", rc);
-               if (rc != -ENODATA)
-                       return rc;
-       }
-
        if (cp->cpg_defer_uptodate) {
                cp->cpg_ra_used = 1;
                cl_page_export(env, page, 1);
         * Add page into the queue even when it is marked uptodate above.
         * this will unlock it automatically as part of cl_page_list_disown().
         */
+
        cl_page_list_add(&queue->c2_qin, page);
        if (sbi->ll_ra_info.ra_max_pages_per_file &&
            sbi->ll_ra_info.ra_max_pages)
-               ll_readahead(env, io, ras,
-                            vmpage->mapping, &queue->c2_qin, fd->fd_flags);
+               ll_readahead(env, io, &queue->c2_qin, ras,
+                            cp->cpg_defer_uptodate);
 
        return 0;
 }
 
        LASSERT(PageLocked(vmpage));
 
        if (cpg->cpg_defer_uptodate && !cpg->cpg_ra_used)
-               ll_ra_stats_inc(vmpage->mapping, RA_STAT_DISCARDED);
+               ll_ra_stats_inc(vmpage->mapping->host, RA_STAT_DISCARDED);
 
        ll_invalidate_page(vmpage);
 }
        return result;
 }
 
+static int vvp_page_is_under_lock(const struct lu_env *env,
+                                 const struct cl_page_slice *slice,
+                                 struct cl_io *io, pgoff_t *max_index)
+{
+       if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE ||
+           io->ci_type == CIT_FAULT) {
+               struct ccc_io *cio = ccc_env_io(env);
+
+               if (unlikely(cio->cui_fd->fd_flags & LL_FILE_GROUP_LOCKED))
+                       *max_index = CL_PAGE_EOF;
+       }
+       return 0;
+}
+
 static int vvp_page_print(const struct lu_env *env,
                          const struct cl_page_slice *slice,
                          void *cookie, lu_printer_t printer)
        .cpo_is_vmlocked   = vvp_page_is_vmlocked,
        .cpo_fini         = vvp_page_fini,
        .cpo_print       = vvp_page_print,
-       .cpo_is_under_lock = ccc_page_is_under_lock,
+       .cpo_is_under_lock = vvp_page_is_under_lock,
        .io = {
                [CRT_READ] = {
                        .cpo_prep       = vvp_page_prep_read,
        .cpo_fini         = vvp_transient_page_fini,
        .cpo_is_vmlocked   = vvp_transient_page_is_vmlocked,
        .cpo_print       = vvp_page_print,
-       .cpo_is_under_lock = ccc_page_is_under_lock,
+       .cpo_is_under_lock      = vvp_page_is_under_lock,
        .io = {
                [CRT_READ] = {
                        .cpo_prep       = ccc_transient_page_prep,
 
        CLOBINVRNT(env, obj, ccc_object_invariant(obj));
 
-       cpg->cpg_cl.cpl_index = index;
        cpg->cpg_page = vmpage;
        page_cache_get(vmpage);
 
                atomic_inc(&page->cp_ref);
                SetPagePrivate(vmpage);
                vmpage->private = (unsigned long)page;
-               cl_page_slice_add(page, &cpg->cpg_cl, obj, &vvp_page_ops);
+               cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
+                                 &vvp_page_ops);
        } else {
                struct ccc_object *clobj = cl2ccc(obj);
 
                LASSERT(!inode_trylock(clobj->cob_inode));
-               cl_page_slice_add(page, &cpg->cpg_cl, obj,
+               cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
                                  &vvp_transient_page_ops);
                clobj->cob_transient_pages++;
        }
 
                                         struct lovsub_lock *sub);
 struct lov_io_sub *lov_page_subio(const struct lu_env *env, struct lov_io *lio,
                                  const struct cl_page_slice *slice);
+int lov_page_stripe(const struct cl_page *page);
 
 #define lov_foreach_target(lov, var)               \
        for (var = 0; var < lov_targets_nr(lov); ++var)
 
                          u64 start, u64 end,
                          u64 *obd_start, u64 *obd_end);
 int lov_stripe_number(struct lov_stripe_md *lsm, u64 lov_off);
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
+                        int stripe);
 
 /* lov_qos.c */
 #define LOV_USES_ASSIGNED_STRIPE       0
 
  *
  */
 
-static int lov_page_stripe(const struct cl_page *page)
+int lov_page_stripe(const struct cl_page *page)
 {
        struct lovsub_object *subobj;
        const struct cl_page_slice *slice;
 
        return lov_size;
 }
 
+/**
+ * Compute file level page index by stripe level page offset
+ */
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
+                        int stripe)
+{
+       loff_t offset;
+
+       offset = lov_stripe_size(lsm, stripe_index << PAGE_CACHE_SHIFT,
+                                stripe);
+       return offset >> PAGE_CACHE_SHIFT;
+}
+
 /* we have an offset in file backed by an lov and want to find out where
  * that offset lands in our given stripe of the file.  for the easy
  * case where the offset is within the stripe, we just have to scale the
 
  * Lov page operations.
  *
  */
-static int lov_page_print(const struct lu_env *env,
-                         const struct cl_page_slice *slice,
-                         void *cookie, lu_printer_t printer)
+
+/**
+ * Adjust the stripe index by layout of raid0. @max_index is the maximum
+ * page index covered by an underlying DLM lock.
+ * This function converts max_index from stripe level to file level, and make
+ * sure it's not beyond one stripe.
+ */
+static int lov_raid0_page_is_under_lock(const struct lu_env *env,
+                                       const struct cl_page_slice *slice,
+                                       struct cl_io *unused,
+                                       pgoff_t *max_index)
+{
+       struct lov_object *loo = cl2lov(slice->cpl_obj);
+       struct lov_layout_raid0 *r0 = lov_r0(loo);
+       pgoff_t index = *max_index;
+       unsigned int pps; /* pages per stripe */
+
+       CDEBUG(D_READA, "*max_index = %lu, nr = %d\n", index, r0->lo_nr);
+       if (index == 0) /* the page is not covered by any lock */
+               return 0;
+
+       if (r0->lo_nr == 1) /* single stripe file */
+               return 0;
+
+       /* max_index is stripe level, convert it into file level */
+       if (index != CL_PAGE_EOF) {
+               int stripeno = lov_page_stripe(slice->cpl_page);
+               *max_index = lov_stripe_pgoff(loo->lo_lsm, index, stripeno);
+       }
+
+       /* calculate the end of current stripe */
+       pps = loo->lo_lsm->lsm_stripe_size >> PAGE_CACHE_SHIFT;
+       index = ((slice->cpl_index + pps) & ~(pps - 1)) - 1;
+
+       /* never exceed the end of the stripe */
+       *max_index = min_t(pgoff_t, *max_index, index);
+       return 0;
+}
+
+static int lov_raid0_page_print(const struct lu_env *env,
+                               const struct cl_page_slice *slice,
+                               void *cookie, lu_printer_t printer)
 {
        struct lov_page *lp = cl2lov_page(slice);
 
-       return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p\n", lp);
+       return (*printer)(env, cookie, LUSTRE_LOV_NAME "-page@%p, raid0\n", lp);
 }
 
-static const struct cl_page_operations lov_page_ops = {
-       .cpo_print  = lov_page_print
+static const struct cl_page_operations lov_raid0_page_ops = {
+       .cpo_is_under_lock = lov_raid0_page_is_under_lock,
+       .cpo_print  = lov_raid0_page_print
 };
 
 int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
        rc = lov_stripe_offset(loo->lo_lsm, offset, stripe, &suboff);
        LASSERT(rc == 0);
 
-       cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_page_ops);
+       cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_raid0_page_ops);
 
        sub = lov_sub_get(env, lio, stripe);
        if (IS_ERR(sub))
        return rc;
 }
 
-static int lov_page_empty_print(const struct lu_env *env,
+static int lov_empty_page_print(const struct lu_env *env,
                                const struct cl_page_slice *slice,
                                void *cookie, lu_printer_t printer)
 {
 }
 
 static const struct cl_page_operations lov_empty_page_ops = {
-       .cpo_print = lov_page_empty_print
+       .cpo_print = lov_empty_page_print
 };
 
 int lov_page_init_empty(const struct lu_env *env, struct cl_object *obj,
        struct lov_page *lpg = cl_object_page_slice(obj, page);
        void *addr;
 
-       cl_page_slice_add(page, &lpg->lps_cl, obj, &lov_empty_page_ops);
+       cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_empty_page_ops);
        addr = kmap(page->cp_vmpage);
        memset(addr, 0, cl_page_size(obj));
        kunmap(page->cp_vmpage);
 
 };
 
 int lovsub_page_init(const struct lu_env *env, struct cl_object *obj,
-                    struct cl_page *page, pgoff_t ind)
+                    struct cl_page *page, pgoff_t index)
 {
        struct lovsub_page *lsb = cl_object_page_slice(obj, page);
 
-       cl_page_slice_add(page, &lsb->lsb_cl, obj, &lovsub_page_ops);
+       cl_page_slice_add(page, &lsb->lsb_cl, obj, index, &lovsub_page_ops);
        return 0;
 }
 
 
                                break;
                }
        }
-       if (result == 0)
+       if (result == 0 && queue->c2_qin.pl_nr > 0)
                result = cl_io_submit_rw(env, io, CRT_READ, queue);
        /*
         * Unlock unsent pages in case of error.
 
        __result;                                                      \
 })
 
+#define CL_PAGE_INVOKE_REVERSE(_env, _page, _op, _proto, ...)          \
+({                                                                     \
+       const struct lu_env        *__env  = (_env);                    \
+       struct cl_page             *__page = (_page);                   \
+       const struct cl_page_slice *__scan;                             \
+       int                         __result;                           \
+       ptrdiff_t                   __op   = (_op);                     \
+       int                       (*__method)_proto;                    \
+                                                                       \
+       __result = 0;                                                   \
+       list_for_each_entry_reverse(__scan, &__page->cp_layers,         \
+                                       cpl_linkage) {                  \
+               __method = *(void **)((char *)__scan->cpl_ops +  __op); \
+               if (__method) {                                         \
+                       __result = (*__method)(__env, __scan, ## __VA_ARGS__); \
+                       if (__result != 0)                              \
+                               break;                                  \
+               }                                                       \
+       }                                                               \
+       if (__result > 0)                                               \
+               __result = 0;                                           \
+       __result;                                                       \
+})
+
 #define CL_PAGE_INVOID(_env, _page, _op, _proto, ...)             \
 do {                                                               \
        const struct lu_env     *__env  = (_env);                   \
  * \see cl_page_operations::cpo_is_under_lock()
  */
 int cl_page_is_under_lock(const struct lu_env *env, struct cl_io *io,
-                         struct cl_page *page)
+                         struct cl_page *page, pgoff_t *max_index)
 {
        int rc;
 
        PINVRNT(env, page, cl_page_invariant(page));
 
-       rc = CL_PAGE_INVOKE(env, page, CL_PAGE_OP(cpo_is_under_lock),
-                           (const struct lu_env *,
-                            const struct cl_page_slice *, struct cl_io *),
-                           io);
-       PASSERT(env, page, rc != 0);
+       rc = CL_PAGE_INVOKE_REVERSE(env, page, CL_PAGE_OP(cpo_is_under_lock),
+                                   (const struct lu_env *,
+                                    const struct cl_page_slice *,
+                                     struct cl_io *, pgoff_t *),
+                                   io, max_index);
        return rc;
 }
 EXPORT_SYMBOL(cl_page_is_under_lock);
  * \see cl_lock_slice_add(), cl_req_slice_add(), cl_io_slice_add()
  */
 void cl_page_slice_add(struct cl_page *page, struct cl_page_slice *slice,
-                      struct cl_object *obj,
+                      struct cl_object *obj, pgoff_t index,
                       const struct cl_page_operations *ops)
 {
        list_add_tail(&slice->cpl_linkage, &page->cp_layers);
        slice->cpl_obj  = obj;
+       slice->cpl_index = index;
        slice->cpl_ops  = ops;
        slice->cpl_page = page;
 }
 
 
        page_cache_get(page->cp_vmpage);
        mutex_init(&ep->ep_lock);
-       cl_page_slice_add(page, &ep->ep_cl, obj, &echo_page_ops);
+       cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
        atomic_inc(&eco->eo_npages);
        return 0;
 }
 
 
 static int osc_page_is_under_lock(const struct lu_env *env,
                                  const struct cl_page_slice *slice,
-                                 struct cl_io *unused)
+                                 struct cl_io *unused, pgoff_t *max_index)
 {
        struct osc_page *opg = cl2osc_page(slice);
        struct cl_lock *lock;
        int result = -ENODATA;
 
+       *max_index = 0;
        lock = cl_lock_at_pgoff(env, slice->cpl_obj, osc_index(opg),
                                NULL, 1, 0);
        if (lock) {
+               *max_index = lock->cll_descr.cld_end;
                cl_lock_put(env, lock);
-               result = -EBUSY;
+               result = 0;
        }
        return result;
 }
 
        opg->ops_from = 0;
        opg->ops_to = PAGE_CACHE_SIZE;
-       opg->ops_cl.cpl_index = index;
 
        result = osc_prep_async_page(osc, opg, page->cp_vmpage,
                                     cl_offset(obj, index));
                struct osc_io *oio = osc_env_io(env);
 
                opg->ops_srvlock = osc_io_srvlock(oio);
-               cl_page_slice_add(page, &opg->ops_cl, obj, &osc_page_ops);
+               cl_page_slice_add(page, &opg->ops_cl, obj, index,
+                                 &osc_page_ops);
        }
        /*
         * Cannot assert osc_page_protected() here as read-ahead