struct btrfs_block_group_cache *cache;
        struct btrfs_transaction *cur_trans = trans->transaction;
        int ret = 0;
+       int should_put;
        struct btrfs_path *path;
+       LIST_HEAD(io);
+       int num_started = 0;
+       int num_waited = 0;
 
        if (list_empty(&cur_trans->dirty_bgs))
                return 0;
                cache = list_first_entry(&cur_trans->dirty_bgs,
                                         struct btrfs_block_group_cache,
                                         dirty_list);
+
+               /*
+                * this can happen if cache_save_setup re-dirties a block
+                * group that is already under IO.  Just wait for it to
+                * finish and then do it all again
+                */
+               if (!list_empty(&cache->io_list)) {
+                       list_del_init(&cache->io_list);
+                       btrfs_wait_cache_io(root, trans, cache,
+                                           &cache->io_ctl, path,
+                                           cache->key.objectid);
+                       btrfs_put_block_group(cache);
+                       num_waited++;
+               }
+
                list_del_init(&cache->dirty_list);
+               should_put = 1;
+
                if (cache->disk_cache_state == BTRFS_DC_CLEAR)
                        cache_save_setup(cache, trans, path);
+
                if (!ret)
-                       ret = btrfs_run_delayed_refs(trans, root,
-                                                    (unsigned long) -1);
-               if (!ret && cache->disk_cache_state == BTRFS_DC_SETUP)
-                       btrfs_write_out_cache(root, trans, cache, path);
+                       ret = btrfs_run_delayed_refs(trans, root, (unsigned long) -1);
+
+               if (!ret && cache->disk_cache_state == BTRFS_DC_SETUP) {
+                       cache->io_ctl.inode = NULL;
+                       ret = btrfs_write_out_cache(root, trans, cache, path);
+                       if (ret == 0 && cache->io_ctl.inode) {
+                               num_started++;
+                               should_put = 0;
+                               list_add_tail(&cache->io_list, &io);
+                       } else {
+                               /*
+                                * if we failed to write the cache, the
+                                * generation will be bad and life goes on
+                                */
+                               ret = 0;
+                       }
+               }
                if (!ret)
                        ret = write_one_cache_group(trans, root, path, cache);
+
+               /* if its not on the io list, we need to put the block group */
+               if (should_put)
+                       btrfs_put_block_group(cache);
+       }
+
+       while (!list_empty(&io)) {
+               cache = list_first_entry(&io, struct btrfs_block_group_cache,
+                                        io_list);
+               list_del_init(&cache->io_list);
+               num_waited++;
+               btrfs_wait_cache_io(root, trans, cache,
+                                   &cache->io_ctl, path, cache->key.objectid);
                btrfs_put_block_group(cache);
        }
 
        INIT_LIST_HEAD(&cache->bg_list);
        INIT_LIST_HEAD(&cache->ro_list);
        INIT_LIST_HEAD(&cache->dirty_list);
+       INIT_LIST_HEAD(&cache->io_list);
        btrfs_init_free_space_ctl(cache);
        atomic_set(&cache->trimming, 0);
 
 
        key.objectid = BTRFS_FREE_SPACE_OBJECTID;
        key.offset = offset;
        key.type = 0;
-
        ret = btrfs_insert_empty_item(trans, root, path, &key,
                                      sizeof(struct btrfs_free_space_header));
        if (ret < 0) {
                btrfs_release_path(path);
                return ret;
        }
+
        leaf = path->nodes[0];
        header = btrfs_item_ptr(leaf, path->slots[0],
                                struct btrfs_free_space_header);
        io_ctl->num_pages = num_pages;
        io_ctl->root = root;
        io_ctl->check_crcs = check_crcs;
+       io_ctl->inode = inode;
 
        return 0;
 }
 static void io_ctl_free(struct btrfs_io_ctl *io_ctl)
 {
        kfree(io_ctl->pages);
+       io_ctl->pages = NULL;
 }
 
 static void io_ctl_unmap_page(struct btrfs_io_ctl *io_ctl)
                             GFP_NOFS);
 }
 
+int btrfs_wait_cache_io(struct btrfs_root *root,
+                       struct btrfs_trans_handle *trans,
+                       struct btrfs_block_group_cache *block_group,
+                       struct btrfs_io_ctl *io_ctl,
+                       struct btrfs_path *path, u64 offset)
+{
+       int ret;
+       struct inode *inode = io_ctl->inode;
+
+       root = root->fs_info->tree_root;
+
+       /* Flush the dirty pages in the cache file. */
+       ret = flush_dirty_cache(inode);
+       if (ret)
+               goto out;
+
+       /* Update the cache item to tell everyone this cache file is valid. */
+       ret = update_cache_item(trans, root, inode, path, offset,
+                               io_ctl->entries, io_ctl->bitmaps);
+out:
+       io_ctl_free(io_ctl);
+       if (ret) {
+               invalidate_inode_pages2(inode->i_mapping);
+               BTRFS_I(inode)->generation = 0;
+               if (block_group) {
+#ifdef DEBUG
+                       btrfs_err(root->fs_info,
+                               "failed to write free space cache for block group %llu",
+                               block_group->key.objectid);
+#endif
+               }
+       }
+       btrfs_update_inode(trans, root, inode);
+
+       if (block_group) {
+               spin_lock(&block_group->lock);
+
+               /*
+                * only mark this as written if we didn't get put back on
+                * the dirty list while waiting for IO.
+                */
+               if (!ret && list_empty(&block_group->dirty_list))
+                       block_group->disk_cache_state = BTRFS_DC_WRITTEN;
+               else if (ret)
+                       block_group->disk_cache_state = BTRFS_DC_ERROR;
+
+               spin_unlock(&block_group->lock);
+               io_ctl->inode = NULL;
+               iput(inode);
+       }
+
+       return ret;
+
+}
+
 /**
  * __btrfs_write_out_cache - write out cached info to an inode
  * @root - the root the inode belongs to
 static int __btrfs_write_out_cache(struct btrfs_root *root, struct inode *inode,
                                   struct btrfs_free_space_ctl *ctl,
                                   struct btrfs_block_group_cache *block_group,
+                                  struct btrfs_io_ctl *io_ctl,
                                   struct btrfs_trans_handle *trans,
                                   struct btrfs_path *path, u64 offset)
 {
        struct extent_state *cached_state = NULL;
-       struct btrfs_io_ctl io_ctl;
        LIST_HEAD(bitmap_list);
        int entries = 0;
        int bitmaps = 0;
        int ret;
+       int must_iput = 0;
 
        if (!i_size_read(inode))
                return -1;
 
-       ret = io_ctl_init(&io_ctl, inode, root, 1);
+       WARN_ON(io_ctl->pages);
+       ret = io_ctl_init(io_ctl, inode, root, 1);
        if (ret)
                return -1;
 
                        up_write(&block_group->data_rwsem);
                        BTRFS_I(inode)->generation = 0;
                        ret = 0;
+                       must_iput = 1;
                        goto out;
                }
                spin_unlock(&block_group->lock);
        }
 
        /* Lock all pages first so we can lock the extent safely. */
-       io_ctl_prepare_pages(&io_ctl, inode, 0);
+       io_ctl_prepare_pages(io_ctl, inode, 0);
 
        lock_extent_bits(&BTRFS_I(inode)->io_tree, 0, i_size_read(inode) - 1,
                         0, &cached_state);
 
-       io_ctl_set_generation(&io_ctl, trans->transid);
+       io_ctl_set_generation(io_ctl, trans->transid);
 
        mutex_lock(&ctl->cache_writeout_mutex);
        /* Write out the extent entries in the free space cache */
-       ret = write_cache_extent_entries(&io_ctl, ctl,
+       ret = write_cache_extent_entries(io_ctl, ctl,
                                         block_group, &entries, &bitmaps,
                                         &bitmap_list);
        if (ret) {
         * they will be added into free space cache after the transaction is
         * committed, we shouldn't lose them.
         */
-       ret = write_pinned_extent_entries(root, block_group, &io_ctl, &entries);
+       ret = write_pinned_extent_entries(root, block_group, io_ctl, &entries);
        if (ret) {
                mutex_unlock(&ctl->cache_writeout_mutex);
                goto out_nospc;
         * locked while doing it because a concurrent trim can be manipulating
         * or freeing the bitmap.
         */
-       ret = write_bitmap_entries(&io_ctl, &bitmap_list);
+       ret = write_bitmap_entries(io_ctl, &bitmap_list);
        mutex_unlock(&ctl->cache_writeout_mutex);
        if (ret)
                goto out_nospc;
 
        /* Zero out the rest of the pages just to make sure */
-       io_ctl_zero_remaining_pages(&io_ctl);
+       io_ctl_zero_remaining_pages(io_ctl);
 
        /* Everything is written out, now we dirty the pages in the file. */
-       ret = btrfs_dirty_pages(root, inode, io_ctl.pages, io_ctl.num_pages,
+       ret = btrfs_dirty_pages(root, inode, io_ctl->pages, io_ctl->num_pages,
                                0, i_size_read(inode), &cached_state);
        if (ret)
                goto out_nospc;
         * Release the pages and unlock the extent, we will flush
         * them out later
         */
-       io_ctl_drop_pages(&io_ctl);
+       io_ctl_drop_pages(io_ctl);
 
        unlock_extent_cached(&BTRFS_I(inode)->io_tree, 0,
                             i_size_read(inode) - 1, &cached_state, GFP_NOFS);
 
-       /* Flush the dirty pages in the cache file. */
-       ret = flush_dirty_cache(inode);
+       /*
+        * at this point the pages are under IO and we're happy,
+        * The caller is responsible for waiting on them and updating the
+        * the cache and the inode
+        */
+       io_ctl->entries = entries;
+       io_ctl->bitmaps = bitmaps;
+
+       ret = btrfs_fdatawrite_range(inode, 0, (u64)-1);
        if (ret)
                goto out;
 
-       /* Update the cache item to tell everyone this cache file is valid. */
-       ret = update_cache_item(trans, root, inode, path, offset,
-                               entries, bitmaps);
+       return 0;
+
 out:
-       io_ctl_free(&io_ctl);
+       io_ctl->inode = NULL;
+       io_ctl_free(io_ctl);
        if (ret) {
                invalidate_inode_pages2(inode->i_mapping);
                BTRFS_I(inode)->generation = 0;
        }
        btrfs_update_inode(trans, root, inode);
+       if (must_iput)
+               iput(inode);
        return ret;
 
 out_nospc:
-       cleanup_write_cache_enospc(inode, &io_ctl, &cached_state, &bitmap_list);
+       cleanup_write_cache_enospc(inode, io_ctl, &cached_state, &bitmap_list);
 
        if (block_group && (block_group->flags & BTRFS_BLOCK_GROUP_DATA))
                up_write(&block_group->data_rwsem);
        struct btrfs_free_space_ctl *ctl = block_group->free_space_ctl;
        struct inode *inode;
        int ret = 0;
-       enum btrfs_disk_cache_state dcs = BTRFS_DC_WRITTEN;
 
        root = root->fs_info->tree_root;
 
        if (IS_ERR(inode))
                return 0;
 
-       ret = __btrfs_write_out_cache(root, inode, ctl, block_group, trans,
+       ret = __btrfs_write_out_cache(root, inode, ctl, block_group,
+                                     &block_group->io_ctl, trans,
                                      path, block_group->key.objectid);
        if (ret) {
-               dcs = BTRFS_DC_ERROR;
-               ret = 0;
 #ifdef DEBUG
                btrfs_err(root->fs_info,
                        "failed to write free space cache for block group %llu",
                        block_group->key.objectid);
 #endif
+               spin_lock(&block_group->lock);
+               block_group->disk_cache_state = BTRFS_DC_ERROR;
+               spin_unlock(&block_group->lock);
+
+               block_group->io_ctl.inode = NULL;
+               iput(inode);
        }
 
-       spin_lock(&block_group->lock);
-       block_group->disk_cache_state = dcs;
-       spin_unlock(&block_group->lock);
-       iput(inode);
+       /*
+        * if ret == 0 the caller is expected to call btrfs_wait_cache_io
+        * to wait for IO and put the inode
+        */
+
        return ret;
 }
 
 {
        struct btrfs_free_space_ctl *ctl = root->free_ino_ctl;
        int ret;
+       struct btrfs_io_ctl io_ctl;
 
        if (!btrfs_test_opt(root, INODE_MAP_CACHE))
                return 0;
 
-       ret = __btrfs_write_out_cache(root, inode, ctl, NULL, trans, path, 0);
+       ret = __btrfs_write_out_cache(root, inode, ctl, NULL, &io_ctl,
+                                     trans, path, 0) ||
+               btrfs_wait_cache_io(root, trans, NULL, &io_ctl, path, 0);
        if (ret) {
                btrfs_delalloc_release_metadata(inode, inode->i_size);
 #ifdef DEBUG