struct mutex chunk_mutex;
        struct mutex volume_mutex;
 
+       /*
+        * this is taken to make sure we don't set block groups ro after
+        * the free space cache has been allocated on them
+        */
+       struct mutex ro_block_group_mutex;
+
        /* this is used during read/modify/write to make sure
         * no two ios are trying to mod the same stripe at the same
         * time
                         u64 bytenr, u64 num_bytes, u64 parent,
                         u64 root_objectid, u64 owner, u64 offset, int no_quota);
 
+int btrfs_start_dirty_block_groups(struct btrfs_trans_handle *trans,
+                                  struct btrfs_root *root);
 int btrfs_write_dirty_block_groups(struct btrfs_trans_handle *trans,
                                    struct btrfs_root *root);
 int btrfs_setup_space_cache(struct btrfs_trans_handle *trans,
 
        mutex_init(&fs_info->transaction_kthread_mutex);
        mutex_init(&fs_info->cleaner_mutex);
        mutex_init(&fs_info->volume_mutex);
+       mutex_init(&fs_info->ro_block_group_mutex);
        init_rwsem(&fs_info->commit_root_sem);
        init_rwsem(&fs_info->cleanup_work_sem);
        init_rwsem(&fs_info->subvol_sem);
 
                if (ret)
                        goto out_put;
 
-               ret = btrfs_truncate_free_space_cache(root, trans, inode);
+               ret = btrfs_truncate_free_space_cache(root, trans, NULL, inode);
                if (ret)
                        goto out_put;
        }
        return 0;
 }
 
-int btrfs_write_dirty_block_groups(struct btrfs_trans_handle *trans,
+/*
+ * transaction commit does final block group cache writeback during a
+ * critical section where nothing is allowed to change the FS.  This is
+ * required in order for the cache to actually match the block group,
+ * but can introduce a lot of latency into the commit.
+ *
+ * So, btrfs_start_dirty_block_groups is here to kick off block group
+ * cache IO.  There's a chance we'll have to redo some of it if the
+ * block group changes again during the commit, but it greatly reduces
+ * the commit latency by getting rid of the easy block groups while
+ * we're still allowing others to join the commit.
+ */
+int btrfs_start_dirty_block_groups(struct btrfs_trans_handle *trans,
                                   struct btrfs_root *root)
 {
        struct btrfs_block_group_cache *cache;
        struct btrfs_transaction *cur_trans = trans->transaction;
        int ret = 0;
        int should_put;
-       struct btrfs_path *path;
-       LIST_HEAD(io);
+       struct btrfs_path *path = NULL;
+       LIST_HEAD(dirty);
+       struct list_head *io = &cur_trans->io_bgs;
        int num_started = 0;
-       int num_waited = 0;
+       int loops = 0;
+
+       spin_lock(&cur_trans->dirty_bgs_lock);
+       if (!list_empty(&cur_trans->dirty_bgs)) {
+               list_splice_init(&cur_trans->dirty_bgs, &dirty);
+       }
+       spin_unlock(&cur_trans->dirty_bgs_lock);
 
-       if (list_empty(&cur_trans->dirty_bgs))
+again:
+       if (list_empty(&dirty)) {
+               btrfs_free_path(path);
                return 0;
+       }
+
+       /*
+        * make sure all the block groups on our dirty list actually
+        * exist
+        */
+       btrfs_create_pending_block_groups(trans, root);
+
+       if (!path) {
+               path = btrfs_alloc_path();
+               if (!path)
+                       return -ENOMEM;
+       }
+
+       while (!list_empty(&dirty)) {
+               cache = list_first_entry(&dirty,
+                                        struct btrfs_block_group_cache,
+                                        dirty_list);
+
+               /*
+                * cache_write_mutex is here only to save us from balance
+                * deleting this block group while we are writing out the
+                * cache
+                */
+               mutex_lock(&trans->transaction->cache_write_mutex);
+
+               /*
+                * this can happen if something re-dirties a block
+                * group that is already under IO.  Just wait for it to
+                * finish and then do it all again
+                */
+               if (!list_empty(&cache->io_list)) {
+                       list_del_init(&cache->io_list);
+                       btrfs_wait_cache_io(root, trans, cache,
+                                           &cache->io_ctl, path,
+                                           cache->key.objectid);
+                       btrfs_put_block_group(cache);
+               }
+
+
+               /*
+                * btrfs_wait_cache_io uses the cache->dirty_list to decide
+                * if it should update the cache_state.  Don't delete
+                * until after we wait.
+                *
+                * Since we're not running in the commit critical section
+                * we need the dirty_bgs_lock to protect from update_block_group
+                */
+               spin_lock(&cur_trans->dirty_bgs_lock);
+               list_del_init(&cache->dirty_list);
+               spin_unlock(&cur_trans->dirty_bgs_lock);
+
+               should_put = 1;
+
+               cache_save_setup(cache, trans, path);
+
+               if (cache->disk_cache_state == BTRFS_DC_SETUP) {
+                       cache->io_ctl.inode = NULL;
+                       ret = btrfs_write_out_cache(root, trans, cache, path);
+                       if (ret == 0 && cache->io_ctl.inode) {
+                               num_started++;
+                               should_put = 0;
+
+                               /*
+                                * the cache_write_mutex is protecting
+                                * the io_list
+                                */
+                               list_add_tail(&cache->io_list, io);
+                       } else {
+                               /*
+                                * if we failed to write the cache, the
+                                * generation will be bad and life goes on
+                                */
+                               ret = 0;
+                       }
+               }
+               if (!ret)
+                       ret = write_one_cache_group(trans, root, path, cache);
+               mutex_unlock(&trans->transaction->cache_write_mutex);
+
+               /* if its not on the io list, we need to put the block group */
+               if (should_put)
+                       btrfs_put_block_group(cache);
+
+               if (ret)
+                       break;
+       }
+
+       /*
+        * go through delayed refs for all the stuff we've just kicked off
+        * and then loop back (just once)
+        */
+       ret = btrfs_run_delayed_refs(trans, root, 0);
+       if (!ret && loops == 0) {
+               loops++;
+               spin_lock(&cur_trans->dirty_bgs_lock);
+               list_splice_init(&cur_trans->dirty_bgs, &dirty);
+               spin_unlock(&cur_trans->dirty_bgs_lock);
+               goto again;
+       }
+
+       btrfs_free_path(path);
+       return ret;
+}
+
+int btrfs_write_dirty_block_groups(struct btrfs_trans_handle *trans,
+                                  struct btrfs_root *root)
+{
+       struct btrfs_block_group_cache *cache;
+       struct btrfs_transaction *cur_trans = trans->transaction;
+       int ret = 0;
+       int should_put;
+       struct btrfs_path *path;
+       struct list_head *io = &cur_trans->io_bgs;
+       int num_started = 0;
 
        path = btrfs_alloc_path();
        if (!path)
                                            &cache->io_ctl, path,
                                            cache->key.objectid);
                        btrfs_put_block_group(cache);
-                       num_waited++;
                }
 
+               /*
+                * don't remove from the dirty list until after we've waited
+                * on any pending IO
+                */
                list_del_init(&cache->dirty_list);
                should_put = 1;
 
-               if (cache->disk_cache_state == BTRFS_DC_CLEAR)
-                       cache_save_setup(cache, trans, path);
+               cache_save_setup(cache, trans, path);
 
                if (!ret)
                        ret = btrfs_run_delayed_refs(trans, root, (unsigned long) -1);
                        if (ret == 0 && cache->io_ctl.inode) {
                                num_started++;
                                should_put = 0;
-                               list_add_tail(&cache->io_list, &io);
+                               list_add_tail(&cache->io_list, io);
                        } else {
                                /*
                                 * if we failed to write the cache, the
                        btrfs_put_block_group(cache);
        }
 
-       while (!list_empty(&io)) {
-               cache = list_first_entry(&io, struct btrfs_block_group_cache,
+       while (!list_empty(io)) {
+               cache = list_first_entry(io, struct btrfs_block_group_cache,
                                         io_list);
                list_del_init(&cache->io_list);
-               num_waited++;
                btrfs_wait_cache_io(root, trans, cache,
                                    &cache->io_ctl, path, cache->key.objectid);
                btrfs_put_block_group(cache);
                if (!alloc && cache->cached == BTRFS_CACHE_NO)
                        cache_block_group(cache, 1);
 
-               spin_lock(&trans->transaction->dirty_bgs_lock);
-               if (list_empty(&cache->dirty_list)) {
-                       list_add_tail(&cache->dirty_list,
-                                     &trans->transaction->dirty_bgs);
-                               trans->transaction->num_dirty_bgs++;
-                       btrfs_get_block_group(cache);
-               }
-               spin_unlock(&trans->transaction->dirty_bgs_lock);
-
                byte_in_group = bytenr - cache->key.objectid;
                WARN_ON(byte_in_group > cache->key.offset);
 
                                spin_unlock(&info->unused_bgs_lock);
                        }
                }
+
+               spin_lock(&trans->transaction->dirty_bgs_lock);
+               if (list_empty(&cache->dirty_list)) {
+                       list_add_tail(&cache->dirty_list,
+                                     &trans->transaction->dirty_bgs);
+                               trans->transaction->num_dirty_bgs++;
+                       btrfs_get_block_group(cache);
+               }
+               spin_unlock(&trans->transaction->dirty_bgs_lock);
+
                btrfs_put_block_group(cache);
                total -= num_bytes;
                bytenr += num_bytes;
 
        BUG_ON(cache->ro);
 
+again:
        trans = btrfs_join_transaction(root);
        if (IS_ERR(trans))
                return PTR_ERR(trans);
 
+       /*
+        * we're not allowed to set block groups readonly after the dirty
+        * block groups cache has started writing.  If it already started,
+        * back off and let this transaction commit
+        */
+       mutex_lock(&root->fs_info->ro_block_group_mutex);
+       if (trans->transaction->dirty_bg_run) {
+               u64 transid = trans->transid;
+
+               mutex_unlock(&root->fs_info->ro_block_group_mutex);
+               btrfs_end_transaction(trans, root);
+
+               ret = btrfs_wait_for_commit(root, transid);
+               if (ret)
+                       return ret;
+               goto again;
+       }
+
+
        ret = set_block_group_ro(cache, 0);
        if (!ret)
                goto out;
                alloc_flags = update_block_group_flags(root, cache->flags);
                check_system_chunk(trans, root, alloc_flags);
        }
+       mutex_unlock(&root->fs_info->ro_block_group_mutex);
 
        btrfs_end_transaction(trans, root);
        return ret;
                goto out;
        }
 
+       /*
+        * get the inode first so any iput calls done for the io_list
+        * aren't the final iput (no unlinks allowed now)
+        */
        inode = lookup_free_space_inode(tree_root, block_group, path);
+
+       mutex_lock(&trans->transaction->cache_write_mutex);
+       /*
+        * make sure our free spache cache IO is done before remove the
+        * free space inode
+        */
+       spin_lock(&trans->transaction->dirty_bgs_lock);
+       if (!list_empty(&block_group->io_list)) {
+               list_del_init(&block_group->io_list);
+
+               WARN_ON(!IS_ERR(inode) && inode != block_group->io_ctl.inode);
+
+               spin_unlock(&trans->transaction->dirty_bgs_lock);
+               btrfs_wait_cache_io(root, trans, block_group,
+                                   &block_group->io_ctl, path,
+                                   block_group->key.objectid);
+               btrfs_put_block_group(block_group);
+               spin_lock(&trans->transaction->dirty_bgs_lock);
+       }
+
+       if (!list_empty(&block_group->dirty_list)) {
+               list_del_init(&block_group->dirty_list);
+               btrfs_put_block_group(block_group);
+       }
+       spin_unlock(&trans->transaction->dirty_bgs_lock);
+       mutex_unlock(&trans->transaction->cache_write_mutex);
+
        if (!IS_ERR(inode)) {
                ret = btrfs_orphan_add(trans, inode);
                if (ret) {
 
        spin_lock(&trans->transaction->dirty_bgs_lock);
        if (!list_empty(&block_group->dirty_list)) {
-               list_del_init(&block_group->dirty_list);
-               btrfs_put_block_group(block_group);
+               WARN_ON(1);
+       }
+       if (!list_empty(&block_group->io_list)) {
+               WARN_ON(1);
        }
        spin_unlock(&trans->transaction->dirty_bgs_lock);
-
        btrfs_remove_free_space_cache(block_group);
 
        spin_lock(&block_group->space_info->lock);
 
 
 int btrfs_truncate_free_space_cache(struct btrfs_root *root,
                                    struct btrfs_trans_handle *trans,
+                                   struct btrfs_block_group_cache *block_group,
                                    struct inode *inode)
 {
        int ret = 0;
+       struct btrfs_path *path = btrfs_alloc_path();
+
+       if (!path) {
+               ret = -ENOMEM;
+               goto fail;
+       }
+
+       if (block_group) {
+               mutex_lock(&trans->transaction->cache_write_mutex);
+               if (!list_empty(&block_group->io_list)) {
+                       list_del_init(&block_group->io_list);
+
+                       btrfs_wait_cache_io(root, trans, block_group,
+                                           &block_group->io_ctl, path,
+                                           block_group->key.objectid);
+                       btrfs_put_block_group(block_group);
+               }
+
+               /*
+                * now that we've truncated the cache away, its no longer
+                * setup or written
+                */
+               spin_lock(&block_group->lock);
+               block_group->disk_cache_state = BTRFS_DC_CLEAR;
+               spin_unlock(&block_group->lock);
+       }
+       btrfs_free_path(path);
 
        btrfs_i_size_write(inode, 0);
        truncate_pagecache(inode, 0);
        ret = btrfs_truncate_inode_items(trans, root, inode,
                                         0, BTRFS_EXTENT_DATA_KEY);
        if (ret) {
+               mutex_unlock(&trans->transaction->cache_write_mutex);
                btrfs_abort_transaction(trans, root, ret);
                return ret;
        }
 
        ret = btrfs_update_inode(trans, root, inode);
+
+       if (block_group)
+               mutex_unlock(&trans->transaction->cache_write_mutex);
+
+fail:
        if (ret)
                btrfs_abort_transaction(trans, root, ret);
 
 {
        int ret;
        struct btrfs_free_cluster *cluster = NULL;
+       struct btrfs_free_cluster *cluster_locked = NULL;
        struct rb_node *node = rb_first(&ctl->free_space_offset);
        struct btrfs_trim_range *trim_entry;
 
        }
 
        if (!node && cluster) {
+               cluster_locked = cluster;
+               spin_lock(&cluster_locked->lock);
                node = rb_first(&cluster->root);
                cluster = NULL;
        }
                node = rb_next(node);
                if (!node && cluster) {
                        node = rb_first(&cluster->root);
+                       cluster_locked = cluster;
+                       spin_lock(&cluster_locked->lock);
                        cluster = NULL;
                }
        }
+       if (cluster_locked) {
+               spin_unlock(&cluster_locked->lock);
+               cluster_locked = NULL;
+       }
 
        /*
         * Make sure we don't miss any range that was removed from our rbtree
 
        return 0;
 fail:
+       if (cluster_locked)
+               spin_unlock(&cluster_locked->lock);
        return -ENOSPC;
 }
 
        int ret;
        struct inode *inode = io_ctl->inode;
 
+       if (!inode)
+               return 0;
+
        root = root->fs_info->tree_root;
 
        /* Flush the dirty pages in the cache file. */
        btrfs_update_inode(trans, root, inode);
 
        if (block_group) {
+               /* the dirty list is protected by the dirty_bgs_lock */
+               spin_lock(&trans->transaction->dirty_bgs_lock);
+
+               /* the disk_cache_state is protected by the block group lock */
                spin_lock(&block_group->lock);
 
                /*
                 * only mark this as written if we didn't get put back on
-                * the dirty list while waiting for IO.
+                * the dirty list while waiting for IO.   Otherwise our
+                * cache state won't be right, and we won't get written again
                 */
                if (!ret && list_empty(&block_group->dirty_list))
                        block_group->disk_cache_state = BTRFS_DC_WRITTEN;
                        block_group->disk_cache_state = BTRFS_DC_ERROR;
 
                spin_unlock(&block_group->lock);
+               spin_unlock(&trans->transaction->dirty_bgs_lock);
                io_ctl->inode = NULL;
                iput(inode);
        }
 
        mutex_lock(&ctl->cache_writeout_mutex);
        /* Write out the extent entries in the free space cache */
+       spin_lock(&ctl->tree_lock);
        ret = write_cache_extent_entries(io_ctl, ctl,
                                         block_group, &entries, &bitmaps,
                                         &bitmap_list);
+       spin_unlock(&ctl->tree_lock);
        if (ret) {
                mutex_unlock(&ctl->cache_writeout_mutex);
                goto out_nospc;
         * Some spaces that are freed in the current transaction are pinned,
         * they will be added into free space cache after the transaction is
         * committed, we shouldn't lose them.
+        *
+        * If this changes while we are working we'll get added back to
+        * the dirty list and redo it.  No locking needed
         */
        ret = write_pinned_extent_entries(root, block_group, io_ctl, &entries);
        if (ret) {
         * locked while doing it because a concurrent trim can be manipulating
         * or freeing the bitmap.
         */
+       spin_lock(&ctl->tree_lock);
        ret = write_bitmap_entries(io_ctl, &bitmap_list);
+       spin_unlock(&ctl->tree_lock);
        mutex_unlock(&ctl->cache_writeout_mutex);
        if (ret)
                goto out_nospc;
                spin_unlock(&block_group->lock);
                return 0;
        }
-
-       if (block_group->delalloc_bytes) {
-               block_group->disk_cache_state = BTRFS_DC_WRITTEN;
-               spin_unlock(&block_group->lock);
-               return 0;
-       }
        spin_unlock(&block_group->lock);
 
        inode = lookup_free_space_inode(root, block_group, path);
 
                                       struct btrfs_block_rsv *rsv);
 int btrfs_truncate_free_space_cache(struct btrfs_root *root,
                                    struct btrfs_trans_handle *trans,
+                                   struct btrfs_block_group_cache *block_group,
                                    struct inode *inode);
 int load_free_space_cache(struct btrfs_fs_info *fs_info,
                          struct btrfs_block_group_cache *block_group);
 
        }
 
        if (i_size_read(inode) > 0) {
-               ret = btrfs_truncate_free_space_cache(root, trans, inode);
+               ret = btrfs_truncate_free_space_cache(root, trans, NULL, inode);
                if (ret) {
                        if (ret != -ENOSPC)
                                btrfs_abort_transaction(trans, root, ret);
 
 }
 
 static int delete_block_group_cache(struct btrfs_fs_info *fs_info,
-                                   struct inode *inode, u64 ino)
+                                   struct btrfs_block_group_cache *block_group,
+                                   struct inode *inode,
+                                   u64 ino)
 {
        struct btrfs_key key;
        struct btrfs_root *root = fs_info->tree_root;
                goto out;
        }
 
-       ret = btrfs_truncate_free_space_cache(root, trans, inode);
+       ret = btrfs_truncate_free_space_cache(root, trans, block_group, inode);
 
        btrfs_end_transaction(trans, root);
        btrfs_btree_balance_dirty(root);
         */
        if (ref_root == BTRFS_ROOT_TREE_OBJECTID) {
                ret = delete_block_group_cache(rc->extent_root->fs_info,
+                                              rc->block_group,
                                               NULL, ref_objectid);
                if (ret != -ENOENT)
                        return ret;
        btrfs_free_path(path);
 
        if (!IS_ERR(inode))
-               ret = delete_block_group_cache(fs_info, inode, 0);
+               ret = delete_block_group_cache(fs_info, rc->block_group, inode, 0);
        else
                ret = PTR_ERR(inode);
 
 
        atomic_set(&cur_trans->use_count, 2);
        cur_trans->have_free_bgs = 0;
        cur_trans->start_time = get_seconds();
+       cur_trans->dirty_bg_run = 0;
 
        cur_trans->delayed_refs.href_root = RB_ROOT;
        atomic_set(&cur_trans->delayed_refs.num_entries, 0);
        INIT_LIST_HEAD(&cur_trans->switch_commits);
        INIT_LIST_HEAD(&cur_trans->pending_ordered);
        INIT_LIST_HEAD(&cur_trans->dirty_bgs);
+       INIT_LIST_HEAD(&cur_trans->io_bgs);
+       mutex_init(&cur_trans->cache_write_mutex);
        cur_trans->num_dirty_bgs = 0;
        spin_lock_init(&cur_trans->dirty_bgs_lock);
        list_add_tail(&cur_trans->list, &fs_info->trans_list);
 {
        struct btrfs_fs_info *fs_info = root->fs_info;
        struct list_head *dirty_bgs = &trans->transaction->dirty_bgs;
+       struct list_head *io_bgs = &trans->transaction->io_bgs;
        struct list_head *next;
        struct extent_buffer *eb;
        int ret;
                        return ret;
        }
 
-       while (!list_empty(dirty_bgs)) {
+       while (!list_empty(dirty_bgs) || !list_empty(io_bgs)) {
                ret = btrfs_write_dirty_block_groups(trans, root);
                if (ret)
                        return ret;
                return ret;
        }
 
+       if (!cur_trans->dirty_bg_run) {
+               int run_it = 0;
+
+               /* this mutex is also taken before trying to set
+                * block groups readonly.  We need to make sure
+                * that nobody has set a block group readonly
+                * after a extents from that block group have been
+                * allocated for cache files.  btrfs_set_block_group_ro
+                * will wait for the transaction to commit if it
+                * finds dirty_bg_run = 1
+                *
+                * The dirty_bg_run flag is also used to make sure only
+                * one process starts all the block group IO.  It wouldn't
+                * hurt to have more than one go through, but there's no
+                * real advantage to it either.
+                */
+               mutex_lock(&root->fs_info->ro_block_group_mutex);
+               if (!cur_trans->dirty_bg_run) {
+                       run_it = 1;
+                       cur_trans->dirty_bg_run = 1;
+               }
+               mutex_unlock(&root->fs_info->ro_block_group_mutex);
+
+               if (run_it)
+                       ret = btrfs_start_dirty_block_groups(trans, root);
+       }
+       if (ret) {
+               btrfs_end_transaction(trans, root);
+               return ret;
+       }
+
        spin_lock(&root->fs_info->trans_lock);
        list_splice(&trans->ordered, &cur_trans->pending_ordered);
        if (cur_trans->state >= TRANS_STATE_COMMIT_START) {
 
        assert_qgroups_uptodate(trans);
        ASSERT(list_empty(&cur_trans->dirty_bgs));
+       ASSERT(list_empty(&cur_trans->io_bgs));
        update_super_roots(root);
 
        btrfs_set_super_log_root(root->fs_info->super_copy, 0);
 
        struct list_head pending_ordered;
        struct list_head switch_commits;
        struct list_head dirty_bgs;
+       struct list_head io_bgs;
        u64 num_dirty_bgs;
+
+       /*
+        * we need to make sure block group deletion doesn't race with
+        * free space cache writeout.  This mutex keeps them from stomping
+        * on each other
+        */
+       struct mutex cache_write_mutex;
        spinlock_t dirty_bgs_lock;
        struct btrfs_delayed_ref_root delayed_refs;
        int aborted;
+       int dirty_bg_run;
 };
 
 #define __TRANS_FREEZABLE      (1U << 0)