item->bytes_reserved = 0;
                item->delayed_node = node;
                RB_CLEAR_NODE(&item->rb_node);
+               INIT_LIST_HEAD(&item->log_list);
+               item->logged = false;
                refcount_set(&item->refs, 1);
        }
        return item;
        }
 }
 
+void btrfs_log_get_delayed_items(struct btrfs_inode *inode,
+                                struct list_head *ins_list,
+                                struct list_head *del_list)
+{
+       struct btrfs_delayed_node *node;
+       struct btrfs_delayed_item *item;
+
+       node = btrfs_get_delayed_node(inode);
+       if (!node)
+               return;
+
+       mutex_lock(&node->mutex);
+       item = __btrfs_first_delayed_insertion_item(node);
+       while (item) {
+               /*
+                * It's possible that the item is already in a log list. This
+                * can happen in case two tasks are trying to log the same
+                * directory. For example if we have tasks A and task B:
+                *
+                * Task A collected the delayed items into a log list while
+                * under the inode's log_mutex (at btrfs_log_inode()), but it
+                * only releases the items after logging the inodes they point
+                * to (if they are new inodes), which happens after unlocking
+                * the log mutex;
+                *
+                * Task B enters btrfs_log_inode() and acquires the log_mutex
+                * of the same directory inode, before task B releases the
+                * delayed items. This can happen for example when logging some
+                * inode we need to trigger logging of its parent directory, so
+                * logging two files that have the same parent directory can
+                * lead to this.
+                *
+                * If this happens, just ignore delayed items already in a log
+                * list. All the tasks logging the directory are under a log
+                * transaction and whichever finishes first can not sync the log
+                * before the other completes and leaves the log transaction.
+                */
+               if (!item->logged && list_empty(&item->log_list)) {
+                       refcount_inc(&item->refs);
+                       list_add_tail(&item->log_list, ins_list);
+               }
+               item = __btrfs_next_delayed_item(item);
+       }
+
+       item = __btrfs_first_delayed_deletion_item(node);
+       while (item) {
+               /* It may be non-empty, for the same reason mentioned above. */
+               if (!item->logged && list_empty(&item->log_list)) {
+                       refcount_inc(&item->refs);
+                       list_add_tail(&item->log_list, del_list);
+               }
+               item = __btrfs_next_delayed_item(item);
+       }
+       mutex_unlock(&node->mutex);
+
+       /*
+        * We are called during inode logging, which means the inode is in use
+        * and can not be evicted before we finish logging the inode. So we never
+        * have the last reference on the delayed inode.
+        * Also, we don't use btrfs_release_delayed_node() because that would
+        * requeue the delayed inode (change its order in the list of prepared
+        * nodes) and we don't want to do such change because we don't create or
+        * delete delayed items.
+        */
+       ASSERT(refcount_read(&node->refs) > 1);
+       refcount_dec(&node->refs);
+}
+
+void btrfs_log_put_delayed_items(struct btrfs_inode *inode,
+                                struct list_head *ins_list,
+                                struct list_head *del_list)
+{
+       struct btrfs_delayed_node *node;
+       struct btrfs_delayed_item *item;
+       struct btrfs_delayed_item *next;
+
+       node = btrfs_get_delayed_node(inode);
+       if (!node)
+               return;
+
+       mutex_lock(&node->mutex);
+
+       list_for_each_entry_safe(item, next, ins_list, log_list) {
+               item->logged = true;
+               list_del_init(&item->log_list);
+               if (refcount_dec_and_test(&item->refs))
+                       kfree(item);
+       }
+
+       list_for_each_entry_safe(item, next, del_list, log_list) {
+               item->logged = true;
+               list_del_init(&item->log_list);
+               if (refcount_dec_and_test(&item->refs))
+                       kfree(item);
+       }
+
+       mutex_unlock(&node->mutex);
+
+       /*
+        * We are called during inode logging, which means the inode is in use
+        * and can not be evicted before we finish logging the inode. So we never
+        * have the last reference on the delayed inode.
+        * Also, we don't use btrfs_release_delayed_node() because that would
+        * requeue the delayed inode (change its order in the list of prepared
+        * nodes) and we don't want to do such change because we don't create or
+        * delete delayed items.
+        */
+       ASSERT(refcount_read(&node->refs) > 1);
+       refcount_dec(&node->refs);
+}
 
        return ret;
 }
 
+static int insert_delayed_items_batch(struct btrfs_trans_handle *trans,
+                                     struct btrfs_root *log,
+                                     struct btrfs_path *path,
+                                     const struct btrfs_item_batch *batch,
+                                     const struct btrfs_delayed_item *first_item)
+{
+       const struct btrfs_delayed_item *curr = first_item;
+       int ret;
+
+       ret = btrfs_insert_empty_items(trans, log, path, batch);
+       if (ret)
+               return ret;
+
+       for (int i = 0; i < batch->nr; i++) {
+               char *data_ptr;
+
+               data_ptr = btrfs_item_ptr(path->nodes[0], path->slots[0], char);
+               write_extent_buffer(path->nodes[0], &curr->data,
+                                   (unsigned long)data_ptr, curr->data_len);
+               curr = list_next_entry(curr, log_list);
+               path->slots[0]++;
+       }
+
+       btrfs_release_path(path);
+
+       return 0;
+}
+
+static int log_delayed_insertion_items(struct btrfs_trans_handle *trans,
+                                      struct btrfs_inode *inode,
+                                      struct btrfs_path *path,
+                                      const struct list_head *delayed_ins_list,
+                                      struct btrfs_log_ctx *ctx)
+{
+       /* 195 (4095 bytes of keys and sizes) fits in a single 4K page. */
+       const int max_batch_size = 195;
+       const int leaf_data_size = BTRFS_LEAF_DATA_SIZE(trans->fs_info);
+       const u64 ino = btrfs_ino(inode);
+       struct btrfs_root *log = inode->root->log_root;
+       struct btrfs_item_batch batch = {
+               .nr = 0,
+               .total_data_size = 0,
+       };
+       const struct btrfs_delayed_item *first = NULL;
+       const struct btrfs_delayed_item *curr;
+       char *ins_data;
+       struct btrfs_key *ins_keys;
+       u32 *ins_sizes;
+       u64 curr_batch_size = 0;
+       int batch_idx = 0;
+       int ret;
+
+       /* We are adding dir index items to the log tree. */
+       lockdep_assert_held(&inode->log_mutex);
+
+       /*
+        * We collect delayed items before copying index keys from the subvolume
+        * to the log tree. However just after we collected them, they may have
+        * been flushed (all of them or just some of them), and therefore we
+        * could have copied them from the subvolume tree to the log tree.
+        * So find the first delayed item that was not yet logged (they are
+        * sorted by index number).
+        */
+       list_for_each_entry(curr, delayed_ins_list, log_list) {
+               if (curr->index > inode->last_dir_index_offset) {
+                       first = curr;
+                       break;
+               }
+       }
+
+       /* Empty list or all delayed items were already logged. */
+       if (!first)
+               return 0;
+
+       ins_data = kmalloc(max_batch_size * sizeof(u32) +
+                          max_batch_size * sizeof(struct btrfs_key), GFP_NOFS);
+       if (!ins_data)
+               return -ENOMEM;
+       ins_sizes = (u32 *)ins_data;
+       batch.data_sizes = ins_sizes;
+       ins_keys = (struct btrfs_key *)(ins_data + max_batch_size * sizeof(u32));
+       batch.keys = ins_keys;
+
+       curr = first;
+       while (!list_entry_is_head(curr, delayed_ins_list, log_list)) {
+               const u32 curr_size = curr->data_len + sizeof(struct btrfs_item);
+
+               if (curr_batch_size + curr_size > leaf_data_size ||
+                   batch.nr == max_batch_size) {
+                       ret = insert_delayed_items_batch(trans, log, path,
+                                                        &batch, first);
+                       if (ret)
+                               goto out;
+                       batch_idx = 0;
+                       batch.nr = 0;
+                       batch.total_data_size = 0;
+                       curr_batch_size = 0;
+                       first = curr;
+               }
+
+               ins_sizes[batch_idx] = curr->data_len;
+               ins_keys[batch_idx].objectid = ino;
+               ins_keys[batch_idx].type = BTRFS_DIR_INDEX_KEY;
+               ins_keys[batch_idx].offset = curr->index;
+               curr_batch_size += curr_size;
+               batch.total_data_size += curr->data_len;
+               batch.nr++;
+               batch_idx++;
+               curr = list_next_entry(curr, log_list);
+       }
+
+       ASSERT(batch.nr >= 1);
+       ret = insert_delayed_items_batch(trans, log, path, &batch, first);
+
+       curr = list_last_entry(delayed_ins_list, struct btrfs_delayed_item,
+                              log_list);
+       inode->last_dir_index_offset = curr->index;
+out:
+       kfree(ins_data);
+
+       return ret;
+}
+
+static int log_delayed_deletions_full(struct btrfs_trans_handle *trans,
+                                     struct btrfs_inode *inode,
+                                     struct btrfs_path *path,
+                                     const struct list_head *delayed_del_list,
+                                     struct btrfs_log_ctx *ctx)
+{
+       const u64 ino = btrfs_ino(inode);
+       const struct btrfs_delayed_item *curr;
+
+       curr = list_first_entry(delayed_del_list, struct btrfs_delayed_item,
+                               log_list);
+
+       while (!list_entry_is_head(curr, delayed_del_list, log_list)) {
+               u64 first_dir_index = curr->index;
+               u64 last_dir_index;
+               const struct btrfs_delayed_item *next;
+               int ret;
+
+               /*
+                * Find a range of consecutive dir index items to delete. Like
+                * this we log a single dir range item spanning several contiguous
+                * dir items instead of logging one range item per dir index item.
+                */
+               next = list_next_entry(curr, log_list);
+               while (!list_entry_is_head(next, delayed_del_list, log_list)) {
+                       if (next->index != curr->index + 1)
+                               break;
+                       curr = next;
+                       next = list_next_entry(next, log_list);
+               }
+
+               last_dir_index = curr->index;
+               ASSERT(last_dir_index >= first_dir_index);
+
+               ret = insert_dir_log_key(trans, inode->root->log_root, path,
+                                        ino, first_dir_index, last_dir_index);
+               if (ret)
+                       return ret;
+               curr = list_next_entry(curr, log_list);
+       }
+
+       return 0;
+}
+
+static int batch_delete_dir_index_items(struct btrfs_trans_handle *trans,
+                                       struct btrfs_inode *inode,
+                                       struct btrfs_path *path,
+                                       struct btrfs_log_ctx *ctx,
+                                       const struct list_head *delayed_del_list,
+                                       const struct btrfs_delayed_item *first,
+                                       const struct btrfs_delayed_item **last_ret)
+{
+       const struct btrfs_delayed_item *next;
+       struct extent_buffer *leaf = path->nodes[0];
+       const int last_slot = btrfs_header_nritems(leaf) - 1;
+       int slot = path->slots[0] + 1;
+       const u64 ino = btrfs_ino(inode);
+
+       next = list_next_entry(first, log_list);
+
+       while (slot < last_slot &&
+              !list_entry_is_head(next, delayed_del_list, log_list)) {
+               struct btrfs_key key;
+
+               btrfs_item_key_to_cpu(leaf, &key, slot);
+               if (key.objectid != ino ||
+                   key.type != BTRFS_DIR_INDEX_KEY ||
+                   key.offset != next->index)
+                       break;
+
+               slot++;
+               *last_ret = next;
+               next = list_next_entry(next, log_list);
+       }
+
+       return btrfs_del_items(trans, inode->root->log_root, path,
+                              path->slots[0], slot - path->slots[0]);
+}
+
+static int log_delayed_deletions_incremental(struct btrfs_trans_handle *trans,
+                                            struct btrfs_inode *inode,
+                                            struct btrfs_path *path,
+                                            const struct list_head *delayed_del_list,
+                                            struct btrfs_log_ctx *ctx)
+{
+       struct btrfs_root *log = inode->root->log_root;
+       const struct btrfs_delayed_item *curr;
+       u64 last_range_start;
+       u64 last_range_end = 0;
+       struct btrfs_key key;
+
+       key.objectid = btrfs_ino(inode);
+       key.type = BTRFS_DIR_INDEX_KEY;
+       curr = list_first_entry(delayed_del_list, struct btrfs_delayed_item,
+                               log_list);
+
+       while (!list_entry_is_head(curr, delayed_del_list, log_list)) {
+               const struct btrfs_delayed_item *last = curr;
+               u64 first_dir_index = curr->index;
+               u64 last_dir_index;
+               bool deleted_items = false;
+               int ret;
+
+               key.offset = curr->index;
+               ret = btrfs_search_slot(trans, log, &key, path, -1, 1);
+               if (ret < 0) {
+                       return ret;
+               } else if (ret == 0) {
+                       ret = batch_delete_dir_index_items(trans, inode, path, ctx,
+                                                          delayed_del_list, curr,
+                                                          &last);
+                       if (ret)
+                               return ret;
+                       deleted_items = true;
+               }
+
+               btrfs_release_path(path);
+
+               /*
+                * If we deleted items from the leaf, it means we have a range
+                * item logging their range, so no need to add one or update an
+                * existing one. Otherwise we have to log a dir range item.
+                */
+               if (deleted_items)
+                       goto next_batch;
+
+               last_dir_index = last->index;
+               ASSERT(last_dir_index >= first_dir_index);
+               /*
+                * If this range starts right after where the previous one ends,
+                * then we want to reuse the previous range item and change its
+                * end offset to the end of this range. This is just to minimize
+                * leaf space usage, by avoiding adding a new range item.
+                */
+               if (last_range_end != 0 && first_dir_index == last_range_end + 1)
+                       first_dir_index = last_range_start;
+
+               ret = insert_dir_log_key(trans, log, path, key.objectid,
+                                        first_dir_index, last_dir_index);
+               if (ret)
+                       return ret;
+
+               last_range_start = first_dir_index;
+               last_range_end = last_dir_index;
+next_batch:
+               curr = list_next_entry(last, log_list);
+       }
+
+       return 0;
+}
+
+static int log_delayed_deletion_items(struct btrfs_trans_handle *trans,
+                                     struct btrfs_inode *inode,
+                                     struct btrfs_path *path,
+                                     const struct list_head *delayed_del_list,
+                                     struct btrfs_log_ctx *ctx)
+{
+       /*
+        * We are deleting dir index items from the log tree or adding range
+        * items to it.
+        */
+       lockdep_assert_held(&inode->log_mutex);
+
+       if (list_empty(delayed_del_list))
+               return 0;
+
+       if (ctx->logged_before)
+               return log_delayed_deletions_incremental(trans, inode, path,
+                                                        delayed_del_list, ctx);
+
+       return log_delayed_deletions_full(trans, inode, path, delayed_del_list,
+                                         ctx);
+}
+
+/*
+ * Similar logic as for log_new_dir_dentries(), but it iterates over the delayed
+ * items instead of the subvolume tree.
+ */
+static int log_new_delayed_dentries(struct btrfs_trans_handle *trans,
+                                   struct btrfs_inode *inode,
+                                   const struct list_head *delayed_ins_list,
+                                   struct btrfs_log_ctx *ctx)
+{
+       const bool orig_log_new_dentries = ctx->log_new_dentries;
+       struct btrfs_fs_info *fs_info = trans->fs_info;
+       struct btrfs_delayed_item *item;
+       int ret = 0;
+
+       /*
+        * No need for the log mutex, plus to avoid potential deadlocks or
+        * lockdep annotations due to nesting of delayed inode mutexes and log
+        * mutexes.
+        */
+       lockdep_assert_not_held(&inode->log_mutex);
+
+       ASSERT(!ctx->logging_new_delayed_dentries);
+       ctx->logging_new_delayed_dentries = true;
+
+       list_for_each_entry(item, delayed_ins_list, log_list) {
+               struct btrfs_dir_item *dir_item;
+               struct inode *di_inode;
+               struct btrfs_key key;
+               int log_mode = LOG_INODE_EXISTS;
+
+               dir_item = (struct btrfs_dir_item *)item->data;
+               btrfs_disk_key_to_cpu(&key, &dir_item->location);
+
+               if (key.type == BTRFS_ROOT_ITEM_KEY)
+                       continue;
+
+               di_inode = btrfs_iget(fs_info->sb, key.objectid, inode->root);
+               if (IS_ERR(di_inode)) {
+                       ret = PTR_ERR(di_inode);
+                       break;
+               }
+
+               if (!need_log_inode(trans, BTRFS_I(di_inode))) {
+                       btrfs_add_delayed_iput(di_inode);
+                       continue;
+               }
+
+               if (btrfs_stack_dir_type(dir_item) == BTRFS_FT_DIR)
+                       log_mode = LOG_INODE_ALL;
+
+               ctx->log_new_dentries = false;
+               ret = btrfs_log_inode(trans, BTRFS_I(di_inode), log_mode, ctx);
+
+               if (!ret && ctx->log_new_dentries)
+                       ret = log_new_dir_dentries(trans, BTRFS_I(di_inode), ctx);
+
+               btrfs_add_delayed_iput(di_inode);
+
+               if (ret)
+                       break;
+       }
+
+       ctx->log_new_dentries = orig_log_new_dentries;
+       ctx->logging_new_delayed_dentries = false;
+
+       return ret;
+}
+
 /* log a single inode in the tree log.
  * At least one parent directory for this inode must exist in the tree
  * or be logged already.
        bool need_log_inode_item = true;
        bool xattrs_logged = false;
        bool inode_item_dropped = true;
+       bool full_dir_logging = false;
+       LIST_HEAD(delayed_ins_list);
+       LIST_HEAD(delayed_del_list);
 
        path = btrfs_alloc_path();
        if (!path)
                max_key.type = (u8)-1;
        max_key.offset = (u64)-1;
 
+       if (S_ISDIR(inode->vfs_inode.i_mode) && inode_only == LOG_INODE_ALL)
+               full_dir_logging = true;
+
        /*
-        * Only run delayed items if we are a directory. We want to make sure
-        * all directory indexes hit the fs/subvolume tree so we can find them
-        * and figure out which index ranges have to be logged.
+        * If we are logging a directory while we are logging dentries of the
+        * delayed items of some other inode, then we need to flush the delayed
+        * items of this directory and not log the delayed items directly. This
+        * is to prevent more than one level of recursion into btrfs_log_inode()
+        * by having something like this:
+        *
+        *     $ mkdir -p a/b/c/d/e/f/g/h/...
+        *     $ xfs_io -c "fsync" a
+        *
+        * Where all directories in the path did not exist before and are
+        * created in the current transaction.
+        * So in such a case we directly log the delayed items of the main
+        * directory ("a") without flushing them first, while for each of its
+        * subdirectories we flush their delayed items before logging them.
+        * This prevents a potential unbounded recursion like this:
+        *
+        * btrfs_log_inode()
+        *   log_new_delayed_dentries()
+        *      btrfs_log_inode()
+        *        log_new_delayed_dentries()
+        *          btrfs_log_inode()
+        *            log_new_delayed_dentries()
+        *              (...)
+        *
+        * We have thresholds for the maximum number of delayed items to have in
+        * memory, and once they are hit, the items are flushed asynchronously.
+        * However the limit is quite high, so lets prevent deep levels of
+        * recursion to happen by limiting the maximum depth to be 1.
         */
-       if (S_ISDIR(inode->vfs_inode.i_mode)) {
+       if (full_dir_logging && ctx->logging_new_delayed_dentries) {
                ret = btrfs_commit_inode_delayed_items(trans, inode);
                if (ret)
                        goto out;
         * to known the file was moved from A to B, so logging just A would
         * result in losing the file after a log replay.
         */
-       if (S_ISDIR(inode->vfs_inode.i_mode) &&
-           inode_only == LOG_INODE_ALL &&
-           inode->last_unlink_trans >= trans->transid) {
+       if (full_dir_logging && inode->last_unlink_trans >= trans->transid) {
                btrfs_set_log_full_commit(trans);
                ret = BTRFS_LOG_FORCE_COMMIT;
                goto out_unlock;
        if (ret)
                goto out_unlock;
 
+       /*
+        * If we are logging a directory in full mode, collect the delayed items
+        * before iterating the subvolume tree, so that we don't miss any new
+        * dir index items in case they get flushed while or right after we are
+        * iterating the subvolume tree.
+        */
+       if (full_dir_logging && !ctx->logging_new_delayed_dentries)
+               btrfs_log_get_delayed_items(inode, &delayed_ins_list,
+                                           &delayed_del_list);
+
        ret = copy_inode_items_to_log(trans, inode, &min_key, &max_key,
                                      path, dst_path, logged_isize,
                                      inode_only, ctx,
                write_unlock(&em_tree->lock);
        }
 
-       if (inode_only == LOG_INODE_ALL && S_ISDIR(inode->vfs_inode.i_mode)) {
+       if (full_dir_logging) {
                ret = log_directory_changes(trans, inode, path, dst_path, ctx);
                if (ret)
                        goto out_unlock;
+               ret = log_delayed_insertion_items(trans, inode, path,
+                                                 &delayed_ins_list, ctx);
+               if (ret)
+                       goto out_unlock;
+               ret = log_delayed_deletion_items(trans, inode, path,
+                                                &delayed_del_list, ctx);
+               if (ret)
+                       goto out_unlock;
        }
 
        spin_lock(&inode->lock);
        else
                ret = log_conflicting_inodes(trans, inode->root, ctx);
 
+       if (full_dir_logging && !ctx->logging_new_delayed_dentries) {
+               if (!ret)
+                       ret = log_new_delayed_dentries(trans, inode,
+                                                      &delayed_ins_list, ctx);
+
+               btrfs_log_put_delayed_items(inode, &delayed_ins_list,
+                                           &delayed_del_list);
+       }
+
        return ret;
 }