#define GFP_TRY        (__GFP_HIGHMEM | __GFP_NOWARN)
 
-static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
+/*
+ * some helper functions to deal with single linked page lists,
+ * page->private being our "next" pointer.
+ */
+
+/* If at least n pages are linked at head, get n pages off.
+ * Otherwise, don't modify head, and return NULL.
+ * Locking is the responsibility of the caller.
+ */
+static struct page *page_chain_del(struct page **head, int n)
+{
+       struct page *page;
+       struct page *tmp;
+
+       BUG_ON(!n);
+       BUG_ON(!head);
+
+       page = *head;
+       while (page) {
+               tmp = page_chain_next(page);
+               if (--n == 0)
+                       break; /* found sufficient pages */
+               if (tmp == NULL)
+                       /* insufficient pages, don't use any of them. */
+                       return NULL;
+               page = tmp;
+       }
+
+       /* add end of list marker for the returned list */
+       set_page_private(page, 0);
+       /* actual return value, and adjustment of head */
+       page = *head;
+       *head = tmp;
+       return page;
+}
+
+/* may be used outside of locks to find the tail of a (usually short)
+ * "private" page chain, before adding it back to a global chain head
+ * with page_chain_add() under a spinlock. */
+static struct page *page_chain_tail(struct page *page, int *len)
+{
+       struct page *tmp;
+       int i = 1;
+       while ((tmp = page_chain_next(page)))
+               ++i, page = tmp;
+       if (len)
+               *len = i;
+       return page;
+}
+
+static int page_chain_free(struct page *page)
+{
+       struct page *tmp;
+       int i = 0;
+       page_chain_for_each_safe(page, tmp) {
+               put_page(page);
+               ++i;
+       }
+       return i;
+}
+
+static void page_chain_add(struct page **head,
+               struct page *chain_first, struct page *chain_last)
+{
+#if 1
+       struct page *tmp;
+       tmp = page_chain_tail(chain_first, NULL);
+       BUG_ON(tmp != chain_last);
+#endif
+
+       /* add chain to head */
+       set_page_private(chain_last, (unsigned long)*head);
+       *head = chain_first;
+}
+
+static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
 {
        struct page *page = NULL;
+       struct page *tmp = NULL;
+       int i = 0;
 
        /* Yes, testing drbd_pp_vacant outside the lock is racy.
         * So what. It saves a spin_lock. */
-       if (drbd_pp_vacant > 0) {
+       if (drbd_pp_vacant >= number) {
                spin_lock(&drbd_pp_lock);
-               page = drbd_pp_pool;
-               if (page) {
-                       drbd_pp_pool = (struct page *)page_private(page);
-                       set_page_private(page, 0); /* just to be polite */
-                       drbd_pp_vacant--;
-               }
+               page = page_chain_del(&drbd_pp_pool, number);
+               if (page)
+                       drbd_pp_vacant -= number;
                spin_unlock(&drbd_pp_lock);
+               if (page)
+                       return page;
        }
+
        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
         * "criss-cross" setup, that might cause write-out on some other DRBD,
         * which in turn might block on the other node at this very place.  */
-       if (!page)
-               page = alloc_page(GFP_TRY);
-       if (page)
-               atomic_inc(&mdev->pp_in_use);
-       return page;
+       for (i = 0; i < number; i++) {
+               tmp = alloc_page(GFP_TRY);
+               if (!tmp)
+                       break;
+               set_page_private(tmp, (unsigned long)page);
+               page = tmp;
+       }
+
+       if (i == number)
+               return page;
+
+       /* Not enough pages immediately available this time.
+        * No need to jump around here, drbd_pp_alloc will retry this
+        * function "soon". */
+       if (page) {
+               tmp = page_chain_tail(page, NULL);
+               spin_lock(&drbd_pp_lock);
+               page_chain_add(&drbd_pp_pool, page, tmp);
+               drbd_pp_vacant += i;
+               spin_unlock(&drbd_pp_lock);
+       }
+       return NULL;
 }
 
 /* kick lower level device, if we have more than (arbitrary number)
 
        list_for_each_safe(le, tle, &mdev->net_ee) {
                e = list_entry(le, struct drbd_epoch_entry, w.list);
-               if (drbd_bio_has_active_page(e->private_bio))
+               if (drbd_ee_has_active_page(e))
                        break;
                list_move(le, to_be_freed);
        }
 }
 
 /**
- * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
+ * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
  * @mdev:      DRBD device.
- * @retry:     whether or not to retry allocation forever (or until signalled)
+ * @number:    number of pages requested
+ * @retry:     whether to retry, if not enough pages are available right now
+ *
+ * Tries to allocate number pages, first from our own page pool, then from
+ * the kernel, unless this allocation would exceed the max_buffers setting.
+ * Possibly retry until DRBD frees sufficient pages somewhere else.
  *
- * Tries to allocate a page, first from our own page pool, then from the
- * kernel, unless this allocation would exceed the max_buffers setting.
- * If @retry is non-zero, retry until DRBD frees a page somewhere else.
+ * Returns a page chain linked via page->private.
  */
-static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
+static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
 {
        struct page *page = NULL;
        DEFINE_WAIT(wait);
 
-       if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
-               page = drbd_pp_first_page_or_try_alloc(mdev);
-               if (page)
-                       return page;
-       }
+       /* Yes, we may run up to @number over max_buffers. If we
+        * follow it strictly, the admin will get it wrong anyways. */
+       if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
+               page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 
-       for (;;) {
+       while (page == NULL) {
                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 
                drbd_kick_lo_and_reclaim_net(mdev);
 
                if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
-                       page = drbd_pp_first_page_or_try_alloc(mdev);
+                       page = drbd_pp_first_pages_or_try_alloc(mdev, number);
                        if (page)
                                break;
                }
        }
        finish_wait(&drbd_pp_wait, &wait);
 
+       if (page)
+               atomic_add(number, &mdev->pp_in_use);
        return page;
 }
 
 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
- * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
+ * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
+ * Either links the page chain back to the global pool,
+ * or returns all pages to the system. */
 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
 {
-       int free_it;
-
-       spin_lock(&drbd_pp_lock);
-       if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
-               free_it = 1;
-       } else {
-               set_page_private(page, (unsigned long)drbd_pp_pool);
-               drbd_pp_pool = page;
-               drbd_pp_vacant++;
-               free_it = 0;
-       }
-       spin_unlock(&drbd_pp_lock);
-
-       atomic_dec(&mdev->pp_in_use);
-
-       if (free_it)
-               __free_page(page);
-
-       wake_up(&drbd_pp_wait);
-}
-
-static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
-{
-       struct page *p_to_be_freed = NULL;
-       struct page *page;
-       struct bio_vec *bvec;
        int i;
-
-       spin_lock(&drbd_pp_lock);
-       __bio_for_each_segment(bvec, bio, i, 0) {
-               if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
-                       set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
-                       p_to_be_freed = bvec->bv_page;
-               } else {
-                       set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
-                       drbd_pp_pool = bvec->bv_page;
-                       drbd_pp_vacant++;
-               }
-       }
-       spin_unlock(&drbd_pp_lock);
-       atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
-
-       while (p_to_be_freed) {
-               page = p_to_be_freed;
-               p_to_be_freed = (struct page *)page_private(page);
-               set_page_private(page, 0); /* just to be polite */
-               put_page(page);
+       if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
+               i = page_chain_free(page);
+       else {
+               struct page *tmp;
+               tmp = page_chain_tail(page, &i);
+               spin_lock(&drbd_pp_lock);
+               page_chain_add(&drbd_pp_pool, page, tmp);
+               drbd_pp_vacant += i;
+               spin_unlock(&drbd_pp_lock);
        }
-
+       atomic_sub(i, &mdev->pp_in_use);
+       i = atomic_read(&mdev->pp_in_use);
+       if (i < 0)
+               dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
        wake_up(&drbd_pp_wait);
 }
 
                                     unsigned int data_size,
                                     gfp_t gfp_mask) __must_hold(local)
 {
-       struct request_queue *q;
        struct drbd_epoch_entry *e;
        struct page *page;
-       struct bio *bio;
-       unsigned int ds;
+       unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 
        if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
                return NULL;
                return NULL;
        }
 
-       bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
-       if (!bio) {
-               if (!(gfp_mask & __GFP_NOWARN))
-                       dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
-               goto fail1;
-       }
-
-       bio->bi_bdev = mdev->ldev->backing_bdev;
-       bio->bi_sector = sector;
-
-       ds = data_size;
-       while (ds) {
-               page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
-               if (!page) {
-                       if (!(gfp_mask & __GFP_NOWARN))
-                               dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
-                       goto fail2;
-               }
-               if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
-                       drbd_pp_free(mdev, page);
-                       dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
-                           "data_size=%u,ds=%u) failed\n",
-                           (unsigned long long)sector, data_size, ds);
-
-                       q = bdev_get_queue(bio->bi_bdev);
-                       if (q->merge_bvec_fn) {
-                               struct bvec_merge_data bvm = {
-                                       .bi_bdev = bio->bi_bdev,
-                                       .bi_sector = bio->bi_sector,
-                                       .bi_size = bio->bi_size,
-                                       .bi_rw = bio->bi_rw,
-                               };
-                               int l = q->merge_bvec_fn(q, &bvm,
-                                               &bio->bi_io_vec[bio->bi_vcnt]);
-                               dev_err(DEV, "merge_bvec_fn() = %d\n", l);
-                       }
-
-                       /* dump more of the bio. */
-                       dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
-                       dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
-                       dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
-                       dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
-
-                       goto fail2;
-                       break;
-               }
-               ds -= min_t(int, ds, PAGE_SIZE);
-       }
-
-       D_ASSERT(data_size == bio->bi_size);
-
-       bio->bi_private = e;
-       e->mdev = mdev;
-       e->sector = sector;
-       e->size = bio->bi_size;
+       page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
+       if (!page)
+               goto fail;
 
-       e->private_bio = bio;
-       e->block_id = id;
        INIT_HLIST_NODE(&e->colision);
        e->epoch = NULL;
+       e->mdev = mdev;
+       e->pages = page;
+       atomic_set(&e->pending_bios, 0);
+       e->size = data_size;
        e->flags = 0;
+       e->sector = sector;
+       e->sector = sector;
+       e->block_id = id;
 
        return e;
 
- fail2:
-       drbd_pp_free_bio_pages(mdev, bio);
-       bio_put(bio);
- fail1:
+ fail:
        mempool_free(e, drbd_ee_mempool);
-
        return NULL;
 }
 
 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 {
-       struct bio *bio = e->private_bio;
-       drbd_pp_free_bio_pages(mdev, bio);
-       bio_put(bio);
+       drbd_pp_free(mdev, e->pages);
+       D_ASSERT(atomic_read(&e->pending_bios) == 0);
        D_ASSERT(hlist_unhashed(&e->colision));
        mempool_free(e, drbd_ee_mempool);
 }
                dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
 }
 
+/**
+ * drbd_submit_ee()
+ * @mdev:      DRBD device.
+ * @e:         epoch entry
+ * @rw:                flag field, see bio->bi_rw
+ */
+/* TODO allocate from our own bio_set. */
+int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
+               const unsigned rw, const int fault_type)
+{
+       struct bio *bios = NULL;
+       struct bio *bio;
+       struct page *page = e->pages;
+       sector_t sector = e->sector;
+       unsigned ds = e->size;
+       unsigned n_bios = 0;
+       unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
+
+       /* In most cases, we will only need one bio.  But in case the lower
+        * level restrictions happen to be different at this offset on this
+        * side than those of the sending peer, we may need to submit the
+        * request in more than one bio. */
+next_bio:
+       bio = bio_alloc(GFP_NOIO, nr_pages);
+       if (!bio) {
+               dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
+               goto fail;
+       }
+       /* > e->sector, unless this is the first bio */
+       bio->bi_sector = sector;
+       bio->bi_bdev = mdev->ldev->backing_bdev;
+       /* we special case some flags in the multi-bio case, see below
+        * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
+       bio->bi_rw = rw;
+       bio->bi_private = e;
+       bio->bi_end_io = drbd_endio_sec;
+
+       bio->bi_next = bios;
+       bios = bio;
+       ++n_bios;
+
+       page_chain_for_each(page) {
+               unsigned len = min_t(unsigned, ds, PAGE_SIZE);
+               if (!bio_add_page(bio, page, len, 0)) {
+                       /* a single page must always be possible! */
+                       BUG_ON(bio->bi_vcnt == 0);
+                       goto next_bio;
+               }
+               ds -= len;
+               sector += len >> 9;
+               --nr_pages;
+       }
+       D_ASSERT(page == NULL);
+       D_ASSERT(ds == 0);
+
+       atomic_set(&e->pending_bios, n_bios);
+       do {
+               bio = bios;
+               bios = bios->bi_next;
+               bio->bi_next = NULL;
+
+               /* strip off BIO_RW_UNPLUG unless it is the last bio */
+               if (bios)
+                       bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
+
+               drbd_generic_make_request(mdev, fault_type, bio);
+
+               /* strip off BIO_RW_BARRIER,
+                * unless it is the first or last bio */
+               if (bios && bios->bi_next)
+                       bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
+       } while (bios);
+       maybe_kick_lo(mdev);
+       return 0;
+
+fail:
+       while (bios) {
+               bio = bios;
+               bios = bios->bi_next;
+               bio_put(bio);
+       }
+       return -ENOMEM;
+}
+
 /**
  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
  * @mdev:      DRBD device.
 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
 {
        struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
-       struct bio *bio = e->private_bio;
-
        /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
           (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
           so that we can finish that epoch in drbd_may_finish_epoch().
        if (previous_epoch(mdev, e->epoch))
                dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
 
-       /* prepare bio for re-submit,
-        * re-init volatile members */
        /* we still have a local reference,
         * get_ldev was done in receive_Data. */
-       bio->bi_bdev = mdev->ldev->backing_bdev;
-       bio->bi_sector = e->sector;
-       bio->bi_size = e->size;
-       bio->bi_idx = 0;
-
-       bio->bi_flags &= ~(BIO_POOL_MASK - 1);
-       bio->bi_flags |= 1 << BIO_UPTODATE;
-
-       /* don't know whether this is necessary: */
-       bio->bi_phys_segments = 0;
-       bio->bi_next = NULL;
-
-       /* these should be unchanged: */
-       /* bio->bi_end_io = drbd_endio_write_sec; */
-       /* bio->bi_vcnt = whatever; */
 
        e->w.cb = e_end_block;
-
-       /* This is no longer a barrier request. */
-       bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
-
-       drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
-
+       if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
+               /* drbd_submit_ee fails for one reason only:
+                * if was not able to allocate sufficient bios.
+                * requeue, try again later. */
+               e->w.cb = w_e_reissue;
+               drbd_queue_work(&mdev->data.work, &e->w);
+       }
        return 1;
 }
 
 {
        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
        struct drbd_epoch_entry *e;
-       struct bio_vec *bvec;
        struct page *page;
-       struct bio *bio;
-       int dgs, ds, i, rr;
+       int dgs, ds, rr;
        void *dig_in = mdev->int_dig_in;
        void *dig_vv = mdev->int_dig_vv;
        unsigned long *data;
        e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
        if (!e)
                return NULL;
-       bio = e->private_bio;
+
        ds = data_size;
-       bio_for_each_segment(bvec, bio, i) {
-               page = bvec->bv_page;
+       page = e->pages;
+       page_chain_for_each(page) {
+               unsigned len = min_t(int, ds, PAGE_SIZE);
                data = kmap(page);
-               rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
+               rr = drbd_recv(mdev, data, len);
                if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
                        dev_err(DEV, "Fault injection: Corrupting data on receive\n");
                        data[0] = data[0] ^ (unsigned long)-1;
                }
                kunmap(page);
-               if (rr != min_t(int, ds, PAGE_SIZE)) {
+               if (rr != len) {
                        drbd_free_ee(mdev, e);
                        dev_warn(DEV, "short read receiving data: read %d expected %d\n",
-                            rr, min_t(int, ds, PAGE_SIZE));
+                            rr, len);
                        return NULL;
                }
                ds -= rr;
        }
 
        if (dgs) {
-               drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+               drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
                if (memcmp(dig_in, dig_vv, dgs)) {
                        dev_err(DEV, "Digest integrity check FAILED.\n");
                        drbd_bcast_ee(mdev, "digest failed",
        if (!data_size)
                return TRUE;
 
-       page = drbd_pp_alloc(mdev, 1);
+       page = drbd_pp_alloc(mdev, 1, 1);
 
        data = kmap(page);
        while (data_size) {
        }
 
        if (dgs) {
-               drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+               drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
                if (memcmp(dig_in, dig_vv, dgs)) {
                        dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
                        return 0;
 
        D_ASSERT(hlist_unhashed(&e->colision));
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                drbd_set_in_sync(mdev, sector, e->size);
                ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
        } else {
        struct drbd_epoch_entry *e;
 
        e = read_in_block(mdev, ID_SYNCER, sector, data_size);
-       if (!e) {
-               put_ldev(mdev);
-               return FALSE;
-       }
+       if (!e)
+               goto fail;
 
        dec_rs_pending(mdev);
 
-       e->private_bio->bi_end_io = drbd_endio_write_sec;
-       e->private_bio->bi_rw = WRITE;
-       e->w.cb = e_end_resync_block;
-
        inc_unacked(mdev);
        /* corresponding dec_unacked() in e_end_resync_block()
         * respective _drbd_clear_done_ee */
 
+       e->w.cb = e_end_resync_block;
+
        spin_lock_irq(&mdev->req_lock);
        list_add(&e->w.list, &mdev->sync_ee);
        spin_unlock_irq(&mdev->req_lock);
 
-       drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
-       /* accounting done in endio */
+       if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
+               return TRUE;
 
-       maybe_kick_lo(mdev);
-       return TRUE;
+       drbd_free_ee(mdev, e);
+fail:
+       put_ldev(mdev);
+       return FALSE;
 }
 
 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
        }
 
        if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
-               if (likely(drbd_bio_uptodate(e->private_bio))) {
+               if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                        pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
                                mdev->state.conn <= C_PAUSED_SYNC_T &&
                                e->flags & EE_MAY_SET_IN_SYNC) ?
                return FALSE;
        }
 
-       e->private_bio->bi_end_io = drbd_endio_write_sec;
        e->w.cb = e_end_block;
 
        spin_lock(&mdev->epoch_lock);
                drbd_al_begin_io(mdev, e->sector);
        }
 
-       e->private_bio->bi_rw = rw;
-       drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
-       /* accounting done in endio */
-
-       maybe_kick_lo(mdev);
-       return TRUE;
+       if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
+               return TRUE;
 
 out_interrupted:
        /* yes, the epoch_size now is imbalanced.
                return FALSE;
        }
 
-       e->private_bio->bi_rw = READ;
-       e->private_bio->bi_end_io = drbd_endio_read_sec;
-
        switch (h->command) {
        case P_DATA_REQUEST:
                e->w.cb = w_e_end_data_req;
 
        inc_unacked(mdev);
 
-       drbd_generic_make_request(mdev, fault_type, e->private_bio);
-       maybe_kick_lo(mdev);
-
-       return TRUE;
+       if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
+               return TRUE;
 
 out_free_e:
        kfree(di);
                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
        i = atomic_read(&mdev->pp_in_use);
        if (i)
-               dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
+               dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
 
        D_ASSERT(list_empty(&mdev->read_ee));
        D_ASSERT(list_empty(&mdev->active_ee));
 
 
 /* defined here:
    drbd_md_io_complete
-   drbd_endio_write_sec
-   drbd_endio_read_sec
+   drbd_endio_sec
    drbd_endio_pri
 
  * more endio handlers:
 /* reads on behalf of the partner,
  * "submitted" by the receiver
  */
-void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
+void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
 {
        unsigned long flags = 0;
-       struct drbd_epoch_entry *e = NULL;
-       struct drbd_conf *mdev;
-       int uptodate = bio_flagged(bio, BIO_UPTODATE);
-
-       e = bio->bi_private;
-       mdev = e->mdev;
-
-       if (error)
-               dev_warn(DEV, "read: error=%d s=%llus\n", error,
-                               (unsigned long long)e->sector);
-       if (!error && !uptodate) {
-               dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
-                               (unsigned long long)e->sector);
-               /* strange behavior of some lower level drivers...
-                * fail the request by clearing the uptodate flag,
-                * but do not return any error?! */
-               error = -EIO;
-       }
+       struct drbd_conf *mdev = e->mdev;
 
        D_ASSERT(e->block_id != ID_VACANT);
 
        list_del(&e->w.list);
        if (list_empty(&mdev->read_ee))
                wake_up(&mdev->ee_wait);
+       if (test_bit(__EE_WAS_ERROR, &e->flags))
+               __drbd_chk_io_error(mdev, FALSE);
        spin_unlock_irqrestore(&mdev->req_lock, flags);
 
-       drbd_chk_io_error(mdev, error, FALSE);
        drbd_queue_work(&mdev->data.work, &e->w);
        put_ldev(mdev);
 }
 
+static int is_failed_barrier(int ee_flags)
+{
+       return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
+                       == (EE_IS_BARRIER|EE_WAS_ERROR);
+}
+
 /* writes on behalf of the partner, or resync writes,
- * "submitted" by the receiver.
- */
-void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
+ * "submitted" by the receiver, final stage.  */
+static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 {
        unsigned long flags = 0;
-       struct drbd_epoch_entry *e = NULL;
-       struct drbd_conf *mdev;
+       struct drbd_conf *mdev = e->mdev;
        sector_t e_sector;
        int do_wake;
        int is_syncer_req;
        int do_al_complete_io;
-       int uptodate = bio_flagged(bio, BIO_UPTODATE);
-       int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
 
-       e = bio->bi_private;
-       mdev = e->mdev;
-
-       if (error)
-               dev_warn(DEV, "write: error=%d s=%llus\n", error,
-                               (unsigned long long)e->sector);
-       if (!error && !uptodate) {
-               dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
-                               (unsigned long long)e->sector);
-               /* strange behavior of some lower level drivers...
-                * fail the request by clearing the uptodate flag,
-                * but do not return any error?! */
-               error = -EIO;
-       }
-
-       /* error == -ENOTSUPP would be a better test,
-        * alas it is not reliable */
-       if (error && is_barrier && e->flags & EE_IS_BARRIER) {
+       /* if this is a failed barrier request, disable use of barriers,
+        * and schedule for resubmission */
+       if (is_failed_barrier(e->flags)) {
                drbd_bump_write_ordering(mdev, WO_bdev_flush);
                spin_lock_irqsave(&mdev->req_lock, flags);
                list_del(&e->w.list);
+               e->flags |= EE_RESUBMITTED;
                e->w.cb = w_e_reissue;
                /* put_ldev actually happens below, once we come here again. */
                __release(local);
 
        D_ASSERT(e->block_id != ID_VACANT);
 
-       spin_lock_irqsave(&mdev->req_lock, flags);
-       mdev->writ_cnt += e->size >> 9;
-       is_syncer_req = is_syncer_block_id(e->block_id);
-
        /* after we moved e to done_ee,
         * we may no longer access it,
         * it may be freed/reused already!
         * (as soon as we release the req_lock) */
        e_sector = e->sector;
        do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
+       is_syncer_req = is_syncer_block_id(e->block_id);
 
+       spin_lock_irqsave(&mdev->req_lock, flags);
+       mdev->writ_cnt += e->size >> 9;
        list_del(&e->w.list); /* has been on active_ee or sync_ee */
        list_add_tail(&e->w.list, &mdev->done_ee);
 
                ? list_empty(&mdev->sync_ee)
                : list_empty(&mdev->active_ee);
 
-       if (error)
+       if (test_bit(__EE_WAS_ERROR, &e->flags))
                __drbd_chk_io_error(mdev, FALSE);
        spin_unlock_irqrestore(&mdev->req_lock, flags);
 
 
        wake_asender(mdev);
        put_ldev(mdev);
+}
 
+/* writes on behalf of the partner, or resync writes,
+ * "submitted" by the receiver.
+ */
+void drbd_endio_sec(struct bio *bio, int error)
+{
+       struct drbd_epoch_entry *e = bio->bi_private;
+       struct drbd_conf *mdev = e->mdev;
+       int uptodate = bio_flagged(bio, BIO_UPTODATE);
+       int is_write = bio_data_dir(bio) == WRITE;
+
+       if (error)
+               dev_warn(DEV, "%s: error=%d s=%llus\n",
+                               is_write ? "write" : "read", error,
+                               (unsigned long long)e->sector);
+       if (!error && !uptodate) {
+               dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
+                               is_write ? "write" : "read",
+                               (unsigned long long)e->sector);
+               /* strange behavior of some lower level drivers...
+                * fail the request by clearing the uptodate flag,
+                * but do not return any error?! */
+               error = -EIO;
+       }
+
+       if (error)
+               set_bit(__EE_WAS_ERROR, &e->flags);
+
+       bio_put(bio); /* no need for the bio anymore */
+       if (atomic_dec_and_test(&e->pending_bios)) {
+               if (is_write)
+                       drbd_endio_write_sec_final(e);
+               else
+                       drbd_endio_read_sec_final(e);
+       }
 }
 
 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
        return 1; /* Simply ignore this! */
 }
 
-void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
+void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
+{
+       struct hash_desc desc;
+       struct scatterlist sg;
+       struct page *page = e->pages;
+       struct page *tmp;
+       unsigned len;
+
+       desc.tfm = tfm;
+       desc.flags = 0;
+
+       sg_init_table(&sg, 1);
+       crypto_hash_init(&desc);
+
+       while ((tmp = page_chain_next(page))) {
+               /* all but the last page will be fully used */
+               sg_set_page(&sg, page, PAGE_SIZE, 0);
+               crypto_hash_update(&desc, &sg, sg.length);
+               page = tmp;
+       }
+       /* and now the last, possibly only partially used page */
+       len = e->size & (PAGE_SIZE - 1);
+       sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
+       crypto_hash_update(&desc, &sg, sg.length);
+       crypto_hash_final(&desc, digest);
+}
+
+void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 {
        struct hash_desc desc;
        struct scatterlist sg;
                return 1;
        }
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                digest_size = crypto_hash_digestsize(mdev->csums_tfm);
                digest = kmalloc(digest_size, GFP_NOIO);
                if (digest) {
-                       drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+                       drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 
                        inc_rs_pending(mdev);
                        ok = drbd_send_drequest_csum(mdev,
        /* GFP_TRY, because if there is no memory available right now, this may
         * be rescheduled for later. It is "only" background resync, after all. */
        e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
-       if (!e) {
-               put_ldev(mdev);
-               return 2;
-       }
+       if (!e)
+               goto fail;
 
        spin_lock_irq(&mdev->req_lock);
        list_add(&e->w.list, &mdev->read_ee);
        spin_unlock_irq(&mdev->req_lock);
 
-       e->private_bio->bi_end_io = drbd_endio_read_sec;
-       e->private_bio->bi_rw = READ;
        e->w.cb = w_e_send_csum;
+       if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
+               return 1;
 
-       mdev->read_cnt += size >> 9;
-       drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
-
-       return 1;
+       drbd_free_ee(mdev, e);
+fail:
+       put_ldev(mdev);
+       return 2;
 }
 
 void resync_timer_fn(unsigned long data)
 /* helper */
 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 {
-       if (drbd_bio_has_active_page(e->private_bio)) {
+       if (drbd_ee_has_active_page(e)) {
                /* This might happen if sendpage() has not finished */
                spin_lock_irq(&mdev->req_lock);
                list_add_tail(&e->w.list, &mdev->net_ee);
                return 1;
        }
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                ok = drbd_send_block(mdev, P_DATA_REPLY, e);
        } else {
                if (__ratelimit(&drbd_ratelimit_state))
                put_ldev(mdev);
        }
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
                        inc_rs_pending(mdev);
                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
 
        di = (struct digest_info *)(unsigned long)e->block_id;
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                /* quick hack to try to avoid a race against reconfiguration.
                 * a real fix would be much more involved,
                 * introducing more locking mechanisms */
                        digest = kmalloc(digest_size, GFP_NOIO);
                }
                if (digest) {
-                       drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+                       drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
                        eq = !memcmp(digest, di->digest, digest_size);
                        kfree(digest);
                }
        if (unlikely(cancel))
                goto out;
 
-       if (unlikely(!drbd_bio_uptodate(e->private_bio)))
+       if (unlikely((e->flags & EE_WAS_ERROR) != 0))
                goto out;
 
        digest_size = crypto_hash_digestsize(mdev->verify_tfm);
        /* FIXME if this allocation fails, online verify will not terminate! */
        digest = kmalloc(digest_size, GFP_NOIO);
        if (digest) {
-               drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+               drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
                inc_rs_pending(mdev);
                ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
                                             digest, digest_size, P_OV_REPLY);
 
        di = (struct digest_info *)(unsigned long)e->block_id;
 
-       if (likely(drbd_bio_uptodate(e->private_bio))) {
+       if (likely((e->flags & EE_WAS_ERROR) == 0)) {
                digest_size = crypto_hash_digestsize(mdev->verify_tfm);
                digest = kmalloc(digest_size, GFP_NOIO);
                if (digest) {
-                       drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+                       drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
 
                        D_ASSERT(digest_size == di->digest_size);
                        eq = !memcmp(digest, di->digest, digest_size);