struct btrfs_workers generic_worker;
        struct btrfs_workers workers;
        struct btrfs_workers delalloc_workers;
+       struct btrfs_workers flush_workers;
        struct btrfs_workers endio_workers;
        struct btrfs_workers endio_meta_workers;
        struct btrfs_workers endio_meta_write_workers;
 int btrfs_lookup_csums_range(struct btrfs_root *root, u64 start, u64 end,
                             struct list_head *list, int search_commit);
 /* inode.c */
+struct btrfs_delalloc_work {
+       struct inode *inode;
+       int wait;
+       int delay_iput;
+       struct completion completion;
+       struct list_head list;
+       struct btrfs_work work;
+};
+
+struct btrfs_delalloc_work *btrfs_alloc_delalloc_work(struct inode *inode,
+                                                   int wait, int delay_iput);
+void btrfs_wait_and_free_delalloc_work(struct btrfs_delalloc_work *work);
+
 struct extent_map *btrfs_get_extent_fiemap(struct inode *inode, struct page *page,
                                           size_t pg_offset, u64 start, u64 len,
                                           int create);
 
                           fs_info->thread_pool_size,
                           &fs_info->generic_worker);
 
+       btrfs_init_workers(&fs_info->flush_workers, "flush_delalloc",
+                          fs_info->thread_pool_size,
+                          &fs_info->generic_worker);
+
        btrfs_init_workers(&fs_info->submit_workers, "submit",
                           min_t(u64, fs_devices->num_devices,
                           fs_info->thread_pool_size),
        ret |= btrfs_start_workers(&fs_info->delayed_workers);
        ret |= btrfs_start_workers(&fs_info->caching_workers);
        ret |= btrfs_start_workers(&fs_info->readahead_workers);
+       ret |= btrfs_start_workers(&fs_info->flush_workers);
        if (ret) {
                err = -ENOMEM;
                goto fail_sb_buffer;
        btrfs_stop_workers(&fs_info->submit_workers);
        btrfs_stop_workers(&fs_info->delayed_workers);
        btrfs_stop_workers(&fs_info->caching_workers);
+       btrfs_stop_workers(&fs_info->flush_workers);
 fail_alloc:
 fail_iput:
        btrfs_mapping_tree_free(&fs_info->mapping_tree);
        btrfs_stop_workers(&fs_info->delayed_workers);
        btrfs_stop_workers(&fs_info->caching_workers);
        btrfs_stop_workers(&fs_info->readahead_workers);
+       btrfs_stop_workers(&fs_info->flush_workers);
 
 #ifdef CONFIG_BTRFS_FS_CHECK_INTEGRITY
        if (btrfs_test_opt(root, CHECK_INTEGRITY))
 
 static struct extent_io_ops btrfs_extent_io_ops;
 
 static struct kmem_cache *btrfs_inode_cachep;
+static struct kmem_cache *btrfs_delalloc_work_cachep;
 struct kmem_cache *btrfs_trans_handle_cachep;
 struct kmem_cache *btrfs_transaction_cachep;
 struct kmem_cache *btrfs_path_cachep;
                kmem_cache_destroy(btrfs_path_cachep);
        if (btrfs_free_space_cachep)
                kmem_cache_destroy(btrfs_free_space_cachep);
+       if (btrfs_delalloc_work_cachep)
+               kmem_cache_destroy(btrfs_delalloc_work_cachep);
 }
 
 int btrfs_init_cachep(void)
        if (!btrfs_free_space_cachep)
                goto fail;
 
+       btrfs_delalloc_work_cachep = kmem_cache_create("btrfs_delalloc_work",
+                       sizeof(struct btrfs_delalloc_work), 0,
+                       SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD,
+                       NULL);
+       if (!btrfs_delalloc_work_cachep)
+               goto fail;
+
        return 0;
 fail:
        btrfs_destroy_cachep();
        return ret;
 }
 
+static void btrfs_run_delalloc_work(struct btrfs_work *work)
+{
+       struct btrfs_delalloc_work *delalloc_work;
+
+       delalloc_work = container_of(work, struct btrfs_delalloc_work,
+                                    work);
+       if (delalloc_work->wait)
+               btrfs_wait_ordered_range(delalloc_work->inode, 0, (u64)-1);
+       else
+               filemap_flush(delalloc_work->inode->i_mapping);
+
+       if (delalloc_work->delay_iput)
+               btrfs_add_delayed_iput(delalloc_work->inode);
+       else
+               iput(delalloc_work->inode);
+       complete(&delalloc_work->completion);
+}
+
+struct btrfs_delalloc_work *btrfs_alloc_delalloc_work(struct inode *inode,
+                                                   int wait, int delay_iput)
+{
+       struct btrfs_delalloc_work *work;
+
+       work = kmem_cache_zalloc(btrfs_delalloc_work_cachep, GFP_NOFS);
+       if (!work)
+               return NULL;
+
+       init_completion(&work->completion);
+       INIT_LIST_HEAD(&work->list);
+       work->inode = inode;
+       work->wait = wait;
+       work->delay_iput = delay_iput;
+       work->work.func = btrfs_run_delalloc_work;
+
+       return work;
+}
+
+void btrfs_wait_and_free_delalloc_work(struct btrfs_delalloc_work *work)
+{
+       wait_for_completion(&work->completion);
+       kmem_cache_free(btrfs_delalloc_work_cachep, work);
+}
+
 /*
  * some fairly slow code that needs optimization. This walks the list
  * of all the inodes with pending delalloc and forces them to disk.
        struct list_head *head = &root->fs_info->delalloc_inodes;
        struct btrfs_inode *binode;
        struct inode *inode;
+       struct btrfs_delalloc_work *work, *next;
+       struct list_head works;
+       int ret = 0;
 
        if (root->fs_info->sb->s_flags & MS_RDONLY)
                return -EROFS;
 
+       INIT_LIST_HEAD(&works);
+
        spin_lock(&root->fs_info->delalloc_lock);
        while (!list_empty(head)) {
                binode = list_entry(head->next, struct btrfs_inode,
                        list_del_init(&binode->delalloc_inodes);
                spin_unlock(&root->fs_info->delalloc_lock);
                if (inode) {
-                       filemap_flush(inode->i_mapping);
-                       if (delay_iput)
-                               btrfs_add_delayed_iput(inode);
-                       else
-                               iput(inode);
+                       work = btrfs_alloc_delalloc_work(inode, 0, delay_iput);
+                       if (!work) {
+                               ret = -ENOMEM;
+                               goto out;
+                       }
+                       list_add_tail(&work->list, &works);
+                       btrfs_queue_worker(&root->fs_info->flush_workers,
+                                          &work->work);
                }
                cond_resched();
                spin_lock(&root->fs_info->delalloc_lock);
                    atomic_read(&root->fs_info->async_delalloc_pages) == 0));
        }
        atomic_dec(&root->fs_info->async_submit_draining);
-       return 0;
+out:
+       list_for_each_entry_safe(work, next, &works, list) {
+               list_del_init(&work->list);
+               btrfs_wait_and_free_delalloc_work(work);
+       }
+       return ret;
 }
 
 static int btrfs_symlink(struct inode *dir, struct dentry *dentry,