int                     *continued_write,
        int                     *logoffsetp);
 STATIC void
-xlog_state_switch_iclogs(
-       struct xlog             *log,
-       struct xlog_in_core     *iclog,
-       int                     eventual_size);
-STATIC void
 xlog_grant_push_ail(
        struct xlog             *log,
        int                     need_bytes);
  * This routine will mark the current iclog in the ring as WANT_SYNC and move
  * the current iclog pointer to the next iclog in the ring.
  */
-STATIC void
+void
 xlog_state_switch_iclogs(
        struct xlog             *log,
        struct xlog_in_core     *iclog,
        return -EIO;
 }
 
+/*
+ * Force the log to a specific LSN.
+ *
+ * If an iclog with that lsn can be found:
+ *     If it is in the DIRTY state, just return.
+ *     If it is in the ACTIVE state, move the in-core log into the WANT_SYNC
+ *             state and go to sleep or return.
+ *     If it is in any other state, go to sleep or return.
+ *
+ * Synchronous forces are implemented with a wait queue.  All callers trying
+ * to force a given lsn to disk must wait on the queue attached to the
+ * specific in-core log.  When given in-core log finally completes its write
+ * to disk, that thread will wake up all threads waiting on the queue.
+ */
 static int
 xlog_force_lsn(
        struct xlog             *log,
 }
 
 /*
- * Force the in-core log to disk for a specific LSN.
+ * Force the log to a specific checkpoint sequence.
  *
- * Find in-core log with lsn.
- *     If it is in the DIRTY state, just return.
- *     If it is in the ACTIVE state, move the in-core log into the WANT_SYNC
- *             state and go to sleep or return.
- *     If it is in any other state, go to sleep or return.
- *
- * Synchronous forces are implemented with a wait queue.  All callers trying
- * to force a given lsn to disk must wait on the queue attached to the
- * specific in-core log.  When given in-core log finally completes its write
- * to disk, that thread will wake up all threads waiting on the queue.
+ * First force the CIL so that all the required changes have been flushed to the
+ * iclogs. If the CIL force completed it will return a commit LSN that indicates
+ * the iclog that needs to be flushed to stable storage. If the caller needs
+ * a synchronous log force, we will wait on the iclog with the LSN returned by
+ * xlog_cil_force_seq() to be completed.
  */
 int
 xfs_log_force_seq(
 
        xfs_csn_t               push_seq;
        struct bio              bio;
        DECLARE_COMPLETION_ONSTACK(bdev_flush);
+       bool                    push_commit_stable;
 
        new_ctx = kmem_zalloc(sizeof(*new_ctx), KM_NOFS);
        new_ctx->ticket = xlog_cil_ticket_alloc(log);
        spin_lock(&cil->xc_push_lock);
        push_seq = cil->xc_push_seq;
        ASSERT(push_seq <= ctx->sequence);
+       push_commit_stable = cil->xc_push_commit_stable;
+       cil->xc_push_commit_stable = false;
 
        /*
         * As we are about to switch to a new, empty CIL context, we no longer
         * The commit iclog must be written to stable storage to guarantee
         * journal IO vs metadata writeback IO is correctly ordered on stable
         * storage.
+        *
+        * If the push caller needs the commit to be immediately stable and the
+        * commit_iclog is not yet marked as XLOG_STATE_WANT_SYNC to indicate it
+        * will be written when released, switch it's state to WANT_SYNC right
+        * now.
         */
        ctx->commit_iclog->ic_flags |= XLOG_ICL_NEED_FUA;
+       if (push_commit_stable &&
+           ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE)
+               xlog_state_switch_iclogs(log, ctx->commit_iclog, 0);
        xlog_state_release_iclog(log, ctx->commit_iclog, preflush_tail_lsn);
 
        /* Not safe to reference ctx now! */
 /*
  * xlog_cil_push_now() is used to trigger an immediate CIL push to the sequence
  * number that is passed. When it returns, the work will be queued for
- * @push_seq, but it won't be completed. The caller is expected to do any
- * waiting for push_seq to complete if it is required.
+ * @push_seq, but it won't be completed.
+ *
+ * If the caller is performing a synchronous force, we will flush the workqueue
+ * to get previously queued work moving to minimise the wait time they will
+ * undergo waiting for all outstanding pushes to complete. The caller is
+ * expected to do the required waiting for push_seq to complete.
+ *
+ * If the caller is performing an async push, we need to ensure that the
+ * checkpoint is fully flushed out of the iclogs when we finish the push. If we
+ * don't do this, then the commit record may remain sitting in memory in an
+ * ACTIVE iclog. This then requires another full log force to push to disk,
+ * which defeats the purpose of having an async, non-blocking CIL force
+ * mechanism. Hence in this case we need to pass a flag to the push work to
+ * indicate it needs to flush the commit record itself.
  */
 static void
 xlog_cil_push_now(
        struct xlog     *log,
-       xfs_lsn_t       push_seq)
+       xfs_lsn_t       push_seq,
+       bool            async)
 {
        struct xfs_cil  *cil = log->l_cilp;
 
        ASSERT(push_seq && push_seq <= cil->xc_current_sequence);
 
        /* start on any pending background push to minimise wait time on it */
-       flush_work(&cil->xc_push_work);
+       if (!async)
+               flush_work(&cil->xc_push_work);
 
        /*
         * If the CIL is empty or we've already pushed the sequence then
        }
 
        cil->xc_push_seq = push_seq;
+       cil->xc_push_commit_stable = async;
        queue_work(log->l_mp->m_cil_workqueue, &cil->xc_push_work);
        spin_unlock(&cil->xc_push_lock);
 }
        xlog_cil_push_background(log);
 }
 
+/*
+ * Flush the CIL to stable storage but don't wait for it to complete. This
+ * requires the CIL push to ensure the commit record for the push hits the disk,
+ * but otherwise is no different to a push done from a log force.
+ */
+void
+xlog_cil_flush(
+       struct xlog     *log)
+{
+       xfs_csn_t       seq = log->l_cilp->xc_current_sequence;
+
+       trace_xfs_log_force(log->l_mp, seq, _RET_IP_);
+       xlog_cil_push_now(log, seq, true);
+}
+
 /*
  * Conditionally push the CIL based on the sequence passed in.
  *
- * We only need to push if we haven't already pushed the sequence
- * number given. Hence the only time we will trigger a push here is
- * if the push sequence is the same as the current context.
+ * We only need to push if we haven't already pushed the sequence number given.
+ * Hence the only time we will trigger a push here is if the push sequence is
+ * the same as the current context.
  *
  * We return the current commit lsn to allow the callers to determine if a
  * iclog flush is necessary following this call.
 
        ASSERT(sequence <= cil->xc_current_sequence);
 
+       if (!sequence)
+               sequence = cil->xc_current_sequence;
+       trace_xfs_log_force(log->l_mp, sequence, _RET_IP_);
+
        /*
         * check to see if we need to force out the current context.
         * xlog_cil_push() handles racing pushes for the same sequence,
         * so no need to deal with it here.
         */
 restart:
-       xlog_cil_push_now(log, sequence);
+       xlog_cil_push_now(log, sequence, false);
 
        /*
         * See if we can find a previous sequence still committing.
                         * It is still being pushed! Wait for the push to
                         * complete, then start again from the beginning.
                         */
+                       XFS_STATS_INC(log->l_mp, xs_log_force_sleep);
                        xlog_wait(&cil->xc_commit_wait, &cil->xc_push_lock);
                        goto restart;
                }
 
 #include "xfs_errortag.h"
 #include "xfs_error.h"
 #include "xfs_log.h"
+#include "xfs_log_priv.h"
 
 #ifdef DEBUG
 /*
 
        /*
         * If we encountered pinned items or did not finish writing out all
-        * buffers the last time we ran, force the log first and wait for it
-        * before pushing again.
+        * buffers the last time we ran, force a background CIL push to get the
+        * items unpinned in the near future. We do not wait on the CIL push as
+        * that could stall us for seconds if there is enough background IO
+        * load. Stalling for that long when the tail of the log is pinned and
+        * needs flushing will hard stop the transaction subsystem when log
+        * space runs out.
         */
        if (ailp->ail_log_flush && ailp->ail_last_pushed_lsn == 0 &&
            (!list_empty_careful(&ailp->ail_buf_list) ||
                ailp->ail_log_flush = 0;
 
                XFS_STATS_INC(mp, xs_push_ail_flush);
-               xfs_log_force(mp, XFS_LOG_SYNC);
+               xlog_cil_flush(mp->m_log);
        }
 
        spin_lock(&ailp->ail_lock);