#include "../internal.h"
 
+#define IOEND_BATCH_SIZE       4096
+
 /*
  * Structure allocated for each folio when block size < folio size
  * to track sub-folio uptodate status and I/O completions.
  * state, release holds on bios, and finally free up memory.  Do not use the
  * ioend after this.
  */
-static void
+static u32
 iomap_finish_ioend(struct iomap_ioend *ioend, int error)
 {
        struct inode *inode = ioend->io_inode;
        u64 start = bio->bi_iter.bi_sector;
        loff_t offset = ioend->io_offset;
        bool quiet = bio_flagged(bio, BIO_QUIET);
+       u32 folio_count = 0;
 
        for (bio = &ioend->io_inline_bio; bio; bio = next) {
                struct folio_iter fi;
                        next = bio->bi_private;
 
                /* walk all folios in bio, ending page IO on them */
-               bio_for_each_folio_all(fi, bio)
+               bio_for_each_folio_all(fi, bio) {
                        iomap_finish_folio_write(inode, fi.folio, fi.length,
                                        error);
+                       folio_count++;
+               }
                bio_put(bio);
        }
        /* The ioend has been freed by bio_put() */
 "%s: writeback error on inode %lu, offset %lld, sector %llu",
                        inode->i_sb->s_id, inode->i_ino, offset, start);
        }
+       return folio_count;
 }
 
+/*
+ * Ioend completion routine for merged bios. This can only be called from task
+ * contexts as merged ioends can be of unbound length. Hence we have to break up
+ * the writeback completions into manageable chunks to avoid long scheduler
+ * holdoffs. We aim to keep scheduler holdoffs down below 10ms so that we get
+ * good batch processing throughput without creating adverse scheduler latency
+ * conditions.
+ */
 void
 iomap_finish_ioends(struct iomap_ioend *ioend, int error)
 {
        struct list_head tmp;
+       u32 completions;
+
+       might_sleep();
 
        list_replace_init(&ioend->io_list, &tmp);
-       iomap_finish_ioend(ioend, error);
+       completions = iomap_finish_ioend(ioend, error);
 
        while (!list_empty(&tmp)) {
+               if (completions > IOEND_BATCH_SIZE * 8) {
+                       cond_resched();
+                       completions = 0;
+               }
                ioend = list_first_entry(&tmp, struct iomap_ioend, io_list);
                list_del_init(&ioend->io_list);
-               iomap_finish_ioend(ioend, error);
+               completions += iomap_finish_ioend(ioend, error);
        }
 }
 EXPORT_SYMBOL_GPL(iomap_finish_ioends);
                return false;
        if (ioend->io_offset + ioend->io_size != next->io_offset)
                return false;
+       /*
+        * Do not merge physically discontiguous ioends. The filesystem
+        * completion functions will have to iterate the physical
+        * discontiguities even if we merge the ioends at a logical level, so
+        * we don't gain anything by merging physical discontiguities here.
+        *
+        * We cannot use bio->bi_iter.bi_sector here as it is modified during
+        * submission so does not point to the start sector of the bio at
+        * completion.
+        */
+       if (ioend->io_sector + (ioend->io_size >> 9) != next->io_sector)
+               return false;
        return true;
 }
 
        ioend->io_flags = wpc->iomap.flags;
        ioend->io_inode = inode;
        ioend->io_size = 0;
+       ioend->io_folios = 0;
        ioend->io_offset = offset;
        ioend->io_bio = bio;
+       ioend->io_sector = sector;
        return ioend;
 }
 
                return false;
        if (sector != bio_end_sector(wpc->ioend->io_bio))
                return false;
+       /*
+        * Limit ioend bio chain lengths to minimise IO completion latency. This
+        * also prevents long tight loops ending page writeback on all the
+        * folios in the ioend.
+        */
+       if (wpc->ioend->io_folios >= IOEND_BATCH_SIZE)
+               return false;
        return true;
 }
 
                                 &submit_list);
                count++;
        }
+       if (count)
+               wpc->ioend->io_folios++;
 
        WARN_ON_ONCE(!wpc->ioend && !list_empty(&submit_list));
        WARN_ON_ONCE(!folio_test_locked(folio));
 
        memalloc_nofs_restore(nofs_flag);
 }
 
-/* Finish all pending io completions. */
+/*
+ * Finish all pending IO completions that require transactional modifications.
+ *
+ * We try to merge physical and logically contiguous ioends before completion to
+ * minimise the number of transactions we need to perform during IO completion.
+ * Both unwritten extent conversion and COW remapping need to iterate and modify
+ * one physical extent at a time, so we gain nothing by merging physically
+ * discontiguous extents here.
+ *
+ * The ioend chain length that we can be processing here is largely unbound in
+ * length and we may have to perform significant amounts of work on each ioend
+ * to complete it. Hence we have to be careful about holding the CPU for too
+ * long in this loop.
+ */
 void
 xfs_end_io(
        struct work_struct      *work)
                list_del_init(&ioend->io_list);
                iomap_ioend_try_merge(ioend, &tmp);
                xfs_end_ioend(ioend);
+               cond_resched();
        }
 }