return !bl_is_sector_init(be->be_inval, isect);
 }
 
+/* Given the be associated with isect, determine if page data can be
+ * written to disk.
+ */
+static int is_writable(struct pnfs_block_extent *be, sector_t isect)
+{
+       if (be->be_state == PNFS_BLOCK_READWRITE_DATA)
+               return 1;
+       else if (be->be_state != PNFS_BLOCK_INVALID_DATA)
+               return 0;
+       else
+               return bl_is_sector_init(be->be_inval, isect);
+}
+
 /* The data we are handed might be spread across several bios.  We need
  * to track when the last one is finished.
  */
        return PNFS_NOT_ATTEMPTED;
 }
 
+/* This is basically copied from mpage_end_io_read */
+static void bl_end_io_write(struct bio *bio, int err)
+{
+       struct parallel_io *par = bio->bi_private;
+       const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
+       struct nfs_write_data *wdata = (struct nfs_write_data *)par->data;
+
+       if (!uptodate) {
+               if (!wdata->pnfs_error)
+                       wdata->pnfs_error = -EIO;
+               bl_set_lo_fail(wdata->lseg);
+       }
+       bio_put(bio);
+       put_parallel(par);
+}
+
+/* Function scheduled for call during bl_end_par_io_write,
+ * it marks sectors as written and extends the commitlist.
+ */
+static void bl_write_cleanup(struct work_struct *work)
+{
+       struct rpc_task *task;
+       struct nfs_write_data *wdata;
+       dprintk("%s enter\n", __func__);
+       task = container_of(work, struct rpc_task, u.tk_work);
+       wdata = container_of(task, struct nfs_write_data, task);
+       pnfs_ld_write_done(wdata);
+}
+
+/* Called when last of bios associated with a bl_write_pagelist call finishes */
+static void
+bl_end_par_io_write(void *data)
+{
+       struct nfs_write_data *wdata = data;
+
+       /* STUB - ignoring error handling */
+       wdata->task.tk_status = 0;
+       wdata->verf.committed = NFS_FILE_SYNC;
+       INIT_WORK(&wdata->task.u.tk_work, bl_write_cleanup);
+       schedule_work(&wdata->task.u.tk_work);
+}
+
 static enum pnfs_try_status
-bl_write_pagelist(struct nfs_write_data *wdata,
-                 int sync)
+bl_write_pagelist(struct nfs_write_data *wdata, int sync)
 {
-       return PNFS_NOT_ATTEMPTED;
+       int i;
+       struct bio *bio = NULL;
+       struct pnfs_block_extent *be = NULL;
+       sector_t isect, extent_length = 0;
+       struct parallel_io *par;
+       loff_t offset = wdata->args.offset;
+       size_t count = wdata->args.count;
+       struct page **pages = wdata->args.pages;
+       int pg_index = wdata->args.pgbase >> PAGE_CACHE_SHIFT;
+
+       dprintk("%s enter, %Zu@%lld\n", __func__, count, offset);
+       /* At this point, wdata->pages is a (sequential) list of nfs_pages.
+        * We want to write each, and if there is an error remove it from
+        * list and call
+        * nfs_retry_request(req) to have it redone using nfs.
+        * QUEST? Do as block or per req?  Think have to do per block
+        * as part of end_bio
+        */
+       par = alloc_parallel(wdata);
+       if (!par)
+               return PNFS_NOT_ATTEMPTED;
+       par->call_ops = *wdata->mds_ops;
+       par->call_ops.rpc_call_done = bl_rpc_do_nothing;
+       par->pnfs_callback = bl_end_par_io_write;
+       /* At this point, have to be more careful with error handling */
+
+       isect = (sector_t) ((offset & (long)PAGE_CACHE_MASK) >> SECTOR_SHIFT);
+       for (i = pg_index; i < wdata->npages ; i++) {
+               if (!extent_length) {
+                       /* We've used up the previous extent */
+                       bl_put_extent(be);
+                       bio = bl_submit_bio(WRITE, bio);
+                       /* Get the next one */
+                       be = bl_find_get_extent(BLK_LSEG2EXT(wdata->lseg),
+                                            isect, NULL);
+                       if (!be || !is_writable(be, isect)) {
+                               wdata->pnfs_error = -ENOMEM;
+                               goto out;
+                       }
+                       extent_length = be->be_length -
+                               (isect - be->be_f_offset);
+               }
+               for (;;) {
+                       if (!bio) {
+                               bio = bio_alloc(GFP_NOIO, wdata->npages - i);
+                               if (!bio) {
+                                       wdata->pnfs_error = -ENOMEM;
+                                       goto out;
+                               }
+                               bio->bi_sector = isect - be->be_f_offset +
+                                       be->be_v_offset;
+                               bio->bi_bdev = be->be_mdev;
+                               bio->bi_end_io = bl_end_io_write;
+                               bio->bi_private = par;
+                       }
+                       if (bio_add_page(bio, pages[i], PAGE_SIZE, 0))
+                               break;
+                       bio = bl_submit_bio(WRITE, bio);
+               }
+               isect += PAGE_CACHE_SECTORS;
+               extent_length -= PAGE_CACHE_SECTORS;
+       }
+       wdata->res.count = (isect << SECTOR_SHIFT) - (offset);
+       if (count < wdata->res.count)
+               wdata->res.count = count;
+out:
+       bl_put_extent(be);
+       bl_submit_bio(WRITE, bio);
+       put_parallel(par);
+       return PNFS_ATTEMPTED;
 }
 
 /* FIXME - range ignored */