}
 
 /*
- * for readdir, we encode the directory frag and offset within that
- * frag into f_pos.
+ * for f_pos for readdir:
+ * - hash order:
+ *     (0xff << 52) | ((24 bits hash) << 28) |
+ *     (the nth entry has hash collision);
+ * - frag+name order;
+ *     ((frag value) << 28) | (the nth entry in frag);
  */
+#define OFFSET_BITS    28
+#define OFFSET_MASK    ((1 << OFFSET_BITS) - 1)
+#define HASH_ORDER     (0xffull << (OFFSET_BITS + 24))
+loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order)
+{
+       loff_t fpos = ((loff_t)high << 28) | (loff_t)off;
+       if (hash_order)
+               fpos |= HASH_ORDER;
+       return fpos;
+}
+
+static bool is_hash_order(loff_t p)
+{
+       return (p & HASH_ORDER) == HASH_ORDER;
+}
+
 static unsigned fpos_frag(loff_t p)
 {
-       return p >> 32;
+       return p >> OFFSET_BITS;
 }
+
+static unsigned fpos_hash(loff_t p)
+{
+       return ceph_frag_value(fpos_frag(p));
+}
+
 static unsigned fpos_off(loff_t p)
 {
-       return p & 0xffffffff;
+       return p & OFFSET_MASK;
 }
 
 static int fpos_cmp(loff_t l, loff_t r)
        u64 idx = 0;
        int err = 0;
 
-       dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos);
+       dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos);
 
        /* search start position */
        if (ctx->pos > 2) {
                spin_unlock(&dentry->d_lock);
 
                if (emit_dentry) {
-                       dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos,
+                       dout(" %llx dentry %p %pd %p\n", di->offset,
                             dentry, dentry, d_inode(dentry));
                        ctx->pos = di->offset;
                        if (!dir_emit(ctx, dentry->d_name.name,
        return err;
 }
 
+static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos)
+{
+       if (!fi->last_readdir)
+               return true;
+       if (is_hash_order(pos))
+               return !ceph_frag_contains_value(fi->frag, fpos_hash(pos));
+       else
+               return fi->frag != fpos_frag(pos);
+}
+
 static int ceph_readdir(struct file *file, struct dir_context *ctx)
 {
        struct ceph_file_info *fi = file->private_data;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
-       unsigned frag = fpos_frag(ctx->pos);
        int i;
        int err;
        u32 ftype;
                err = __dcache_readdir(file, ctx, shared_gen);
                if (err != -EAGAIN)
                        return err;
-               frag = fpos_frag(ctx->pos);
        } else {
                spin_unlock(&ci->i_ceph_lock);
        }
        /* proceed with a normal readdir */
 more:
        /* do we have the correct frag content buffered? */
-       if (fi->frag != frag || fi->last_readdir == NULL) {
+       if (need_send_readdir(fi, ctx->pos)) {
                struct ceph_mds_request *req;
+               unsigned frag;
                int op = ceph_snap(inode) == CEPH_SNAPDIR ?
                        CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
                        fi->last_readdir = NULL;
                }
 
+               if (is_hash_order(ctx->pos)) {
+                       frag = ceph_choose_frag(ci, fpos_hash(ctx->pos),
+                                               NULL, NULL);
+               } else {
+                       frag = fpos_frag(ctx->pos);
+               }
+
                dout("readdir fetching %llx.%llx frag %x offset '%s'\n",
                     ceph_vinop(inode), frag, fi->last_name);
                req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
                        ceph_mdsc_put_request(req);
                        return err;
                }
-               dout("readdir got and parsed readdir result=%d"
-                    " on frag %x, end=%d, complete=%d\n", err, frag,
+               dout("readdir got and parsed readdir result=%d on "
+                    "frag %x, end=%d, complete=%d, hash_order=%d\n",
+                    err, frag,
                     (int)req->r_reply_info.dir_end,
-                    (int)req->r_reply_info.dir_complete);
-
+                    (int)req->r_reply_info.dir_complete,
+                    (int)req->r_reply_info.hash_order);
 
-               /* note next offset and last dentry name */
                rinfo = &req->r_reply_info;
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
-                       fi->next_offset = req->r_readdir_offset;
-                       /* adjust ctx->pos to beginning of frag */
-                       ctx->pos = ceph_make_fpos(frag, fi->next_offset);
+                       if (!rinfo->hash_order) {
+                               fi->next_offset = req->r_readdir_offset;
+                               /* adjust ctx->pos to beginning of frag */
+                               ctx->pos = ceph_make_fpos(frag,
+                                                         fi->next_offset,
+                                                         false);
+                       }
                }
 
                fi->frag = frag;
                        fi->dir_release_count = 0;
                }
 
-               if (req->r_reply_info.dir_end) {
-                       kfree(fi->last_name);
-                       fi->last_name = NULL;
-                       fi->next_offset = 2;
-               } else {
+               /* note next offset and last dentry name */
+               if (rinfo->dir_nr > 0) {
                        struct ceph_mds_reply_dir_entry *rde =
                                        rinfo->dir_entries + (rinfo->dir_nr-1);
+                       unsigned next_offset = req->r_reply_info.dir_end ?
+                                       2 : (fpos_off(rde->offset) + 1);
                        err = note_last_dentry(fi, rde->name, rde->name_len,
-                                              fpos_off(rde->offset) + 1);
+                                              next_offset);
                        if (err)
                                return err;
+               } else if (req->r_reply_info.dir_end) {
+                       fi->next_offset = 2;
+                       /* keep last name */
                }
        }
 
        rinfo = &fi->last_readdir->r_reply_info;
        dout("readdir frag %x num %d pos %llx chunk first %llx\n",
-            frag, rinfo->dir_nr, ctx->pos,
+            fi->frag, rinfo->dir_nr, ctx->pos,
             rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
 
        i = 0;
                ctx->pos++;
        }
 
-       if (fi->last_name) {
+       if (fi->next_offset > 2) {
                ceph_mdsc_put_request(fi->last_readdir);
                fi->last_readdir = NULL;
                goto more;
        }
 
        /* more frags? */
-       if (!ceph_frag_is_rightmost(frag)) {
-               frag = ceph_frag_next(frag);
-               ctx->pos = ceph_make_fpos(frag, 2);
+       if (!ceph_frag_is_rightmost(fi->frag)) {
+               unsigned frag = ceph_frag_next(fi->frag);
+               if (is_hash_order(ctx->pos)) {
+                       loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag),
+                                                       fi->next_offset, true);
+                       if (new_pos > ctx->pos)
+                               ctx->pos = new_pos;
+                       /* keep last_name */
+               } else {
+                       ctx->pos = ceph_make_fpos(frag, fi->next_offset, false);
+                       kfree(fi->last_name);
+                       fi->last_name = NULL;
+               }
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
 static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
 {
        struct ceph_mds_reply_info_parsed *rinfo;
+       loff_t chunk_offset;
        if (new_pos == 0)
                return true;
-       if (fpos_frag(new_pos) != fi->frag)
+       if (is_hash_order(new_pos)) {
+               /* no need to reset last_name for a forward seek when
+                * dentries are sotred in hash order */
+       } else if (fi->frag |= fpos_frag(new_pos)) {
                return true;
+       }
        rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
        if (!rinfo || !rinfo->dir_nr)
                return true;
-       return new_pos < rinfo->dir_entries[0].offset;;
+       chunk_offset = rinfo->dir_entries[0].offset;
+       return new_pos < chunk_offset ||
+              is_hash_order(new_pos) != is_hash_order(chunk_offset);
 }
 
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
        }
 
        if (offset >= 0) {
+               if (need_reset_readdir(fi, offset)) {
+                       dout("dir_llseek dropping %p content\n", file);
+                       reset_readdir(fi);
+               } else if (is_hash_order(offset) && offset > file->f_pos) {
+                       /* for hash offset, we don't know if a forward seek
+                        * is within same frag */
+                       fi->dir_release_count = 0;
+                       fi->readdir_cache_idx = -1;
+               }
+
                if (offset != file->f_pos) {
                        file->f_pos = offset;
                        file->f_version = 0;
                        fi->flags &= ~CEPH_F_ATEND;
                }
                retval = offset;
-
-               if (need_reset_readdir(fi, offset)) {
-                       dout("dir_llseek dropping %p content\n", file);
-                       reset_readdir(fi);
-               }
        }
 out:
        inode_unlock(inode);
 
                             struct ceph_mds_session *session)
 {
        struct dentry *parent = req->r_dentry;
+       struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
        struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;
        struct qstr dname;
        struct dentry *dn;
        int err = 0, skipped = 0, ret, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-       struct ceph_dentry_info *di;
        u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+       u32 last_hash = 0;
+       u32 fpos_offset;
        struct ceph_readdir_cache_control cache_ctl = {};
 
        if (req->r_aborted)
                return readdir_prepopulate_inodes_only(req, session);
 
+       if (rinfo->hash_order && req->r_path2) {
+               last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                         req->r_path2, strlen(req->r_path2));
+               last_hash = ceph_frag_value(last_hash);
+       }
+
        if (rinfo->dir_dir &&
            le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                dout("readdir_prepopulate got new frag %x -> %x\n",
                     frag, le32_to_cpu(rinfo->dir_dir->frag));
                frag = le32_to_cpu(rinfo->dir_dir->frag);
-               req->r_readdir_offset = 2;
+               if (!rinfo->hash_order)
+                       req->r_readdir_offset = 2;
        }
 
        if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) {
        if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {
                /* note dir version at start of readdir so we can tell
                 * if any dentries get dropped */
-               struct ceph_inode_info *ci = ceph_inode(d_inode(parent));
                req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);
                req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);
                req->r_readdir_cache_idx = 0;
        }
 
        cache_ctl.index = req->r_readdir_cache_idx;
+       fpos_offset = req->r_readdir_offset;
 
        /* FIXME: release caps/leases if error occurs */
        for (i = 0; i < rinfo->dir_nr; i++) {
                vino.ino = le64_to_cpu(rde->inode.in->ino);
                vino.snap = le64_to_cpu(rde->inode.in->snapid);
 
+               if (rinfo->hash_order) {
+                       u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
+                                                rde->name, rde->name_len);
+                       hash = ceph_frag_value(hash);
+                       if (hash != last_hash)
+                               fpos_offset = 2;
+                       last_hash = hash;
+                       rde->offset = ceph_make_fpos(hash, fpos_offset++, true);
+               } else {
+                       rde->offset = ceph_make_fpos(frag, fpos_offset++, false);
+               }
+
 retry_lookup:
                dn = d_lookup(parent, &dname);
                dout("d_lookup on parent=%p name=%.*s got %p\n",
                        dn = realdn;
                }
 
-               di = dn->d_fsdata;
-               di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
-               rde->offset = di->offset;
+               ceph_dentry(dn)->offset = rde->offset;
 
                update_dentry_lease(dn, rde->lease, req->r_session,
                                    req->r_request_started);