From 29a8d1d61ed329d7e5d876fffe572bc05ce2116a Mon Sep 17 00:00:00 2001 From: Bijan Mottahedeh Date: Mon, 14 Dec 2015 05:21:57 -0800 Subject: [PATCH] ldoms: VDS panics with regular file backend. ldoms: VIO FLUSH not implemented correctly. Orabug 22365411 Orabug 22365429 Signed-off-by: Bijan Mottahedeh Reviewed-by: Alexandre Chartre (cherry picked from commit 7062cbbda41153660c72ef51920ed2714947593e) --- drivers/block/vds/vds.h | 6 +- drivers/block/vds/vds_blk.c | 4 +- drivers/block/vds/vds_io.c | 88 ++++++++++++++++++++++--- drivers/block/vds/vds_io.h | 7 +- drivers/block/vds/vds_main.c | 121 +++++++++++++++++++++++------------ drivers/block/vds/vds_reg.c | 8 +-- 6 files changed, 174 insertions(+), 60 deletions(-) diff --git a/drivers/block/vds/vds.h b/drivers/block/vds/vds.h index 28be0e2a26da..ef1912759400 100644 --- a/drivers/block/vds/vds.h +++ b/drivers/block/vds/vds.h @@ -1,7 +1,7 @@ /* * vds.h: LDOM Virtual Disk Server. * - * Copyright (C) 2014 Oracle. All rights reserved. + * Copyright (C) 2014, 2015 Oracle. All rights reserved. */ #include @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -52,7 +53,8 @@ struct vds_port { struct vio_disk_geom *geom; struct vio_disk_vtoc *vtoc; struct workqueue_struct *ioq; - struct workqueue_struct *rtq; + struct list_head io_list; + wait_queue_head_t wait; }; #define VDS_PORT_SEQ 0x1 diff --git a/drivers/block/vds/vds_blk.c b/drivers/block/vds/vds_blk.c index 42bc4a321466..d71f545f449a 100644 --- a/drivers/block/vds/vds_blk.c +++ b/drivers/block/vds/vds_blk.c @@ -1,7 +1,7 @@ /* * vds_blk.c: LDOM Virtual Disk Server. * - * Copyright (C) 2014 Oracle. All rights reserved. + * Copyright (C) 2014, 2015 Oracle. All rights reserved. */ #include "vds.h" @@ -186,7 +186,7 @@ static int vds_blk_flush(struct vds_port *port) { struct block_device *bdev = port->be_data; - return blkdev_issue_flush(bdev, GFP_KERNEL, NULL); + return bdev ? blkdev_issue_flush(bdev, GFP_KERNEL, NULL) : 0; } struct vds_be_ops vds_blk_ops = { diff --git a/drivers/block/vds/vds_io.c b/drivers/block/vds/vds_io.c index d44bee69b9c0..7409e6b488f3 100644 --- a/drivers/block/vds/vds_io.c +++ b/drivers/block/vds/vds_io.c @@ -1,7 +1,7 @@ /* * vds_io.c: LDOM Virtual Disk Server. * - * Copyright (C) 2014 Oracle. All rights reserved. + * Copyright (C) 2014, 2015 Oracle. All rights reserved. */ #include "vds.h" @@ -107,6 +107,8 @@ struct vds_io *vds_io_alloc(struct vio_driver_state *vio, if (func) INIT_WORK(&io->vds_work, func); + INIT_LIST_HEAD(&io->list); + return io; err: @@ -159,14 +161,75 @@ void vds_io_enq(struct vds_io *io) struct vio_driver_state *vio = io->vio; struct vds_port *port = to_vds_port(vio); - vdsdbg(WQ, "cpu=%d\n", smp_processor_id()); + vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(), + list_first_entry_or_null(&port->io_list, struct vds_io, list)); BUG_ON(!in_interrupt()); + list_add_tail(&io->list, &port->io_list); + queue_work(port->ioq, &io->vds_work); +} + +void vds_io_wait(struct vds_io *io) +{ + struct vio_driver_state *vio = io->vio; + struct vds_port *port = to_vds_port(vio); + DEFINE_WAIT(wait); + unsigned long flags; + struct vds_io *first; + + BUG_ON(in_interrupt()); + + vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(), + list_first_entry_or_null(&port->io_list, struct vds_io, list)); + + while (1) { + prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE); + + vds_vio_lock(vio, flags); + first = list_first_entry_or_null(&port->io_list, struct vds_io, + list); + vds_vio_unlock(vio, flags); + + /* + * If the queue is empty or this is the first request + * on the queue, we can proceed. + */ + if (first == NULL || first == io) { + finish_wait(&port->wait, &wait); + break; + } + + schedule(); + finish_wait(&port->wait, &wait); + } +} + +void vds_io_done(struct vds_io *io) +{ + struct vio_driver_state *vio = io->vio; + struct vds_port *port = to_vds_port(vio); + unsigned long flags; + + vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(), + list_first_entry_or_null(&port->io_list, struct vds_io, list)); + + /* + * Dequeue the request. + * + * If this is a reset, discard the internal IO queue. + * This means that any request which was scheduled from the + * kernel workqueue AFTER the reset will execute but no response + * will be sent to the client. + */ + vds_vio_lock(vio, flags); + list_del(&io->list); if (io->flags & VDS_IO_FINI) - queue_work(port->rtq, &io->vds_work); + INIT_LIST_HEAD(&port->io_list); else - queue_work(port->ioq, &io->vds_work); + wake_up(&port->wait); + vds_vio_unlock(vio, flags); + vds_io_free(io); } static int vds_io_rw(struct vds_io *io) @@ -440,15 +503,23 @@ done: return rv; } -int vd_op_flush(struct vio_driver_state *vio) +int vd_op_flush(struct vds_io *io) { int rv; + struct vio_driver_state *vio = io->vio; struct vds_port *port = to_vds_port(vio); - if (port->be_ops) { - flush_workqueue(port->ioq); + /* + * Wait until this the first request on the internal IO queue. + * This will act as a barrier to ensure all preceding IO requests + * have completed and responses have been sent to the client. + * This does not affect any request queued after the flush. + */ + vdsdbg(FLUSH, "VD_OP_FLUSH\n"); + vds_io_wait(io); + if (port->be_ops) rv = port->be_ops->flush(port); - } else + else rv = -EIO; vdsdbg(FLUSH, "VD_OP_FLUSH rv=%d\n", rv); @@ -613,7 +684,6 @@ done: void vds_be_fini(struct vds_port *port) { - flush_workqueue(port->ioq); vds_label_fini(port); if (port->be_ops) { port->be_ops->fini(port); diff --git a/drivers/block/vds/vds_io.h b/drivers/block/vds/vds_io.h index 6faf75202274..cd4efd8f5a66 100644 --- a/drivers/block/vds/vds_io.h +++ b/drivers/block/vds/vds_io.h @@ -1,7 +1,7 @@ /* * vds_io.h: LDOM Virtual Disk Server. * - * Copyright (C) 2014 Oracle. All rights reserved. + * Copyright (C) 2014, 2015 Oracle. All rights reserved. */ struct vds_port; @@ -35,6 +35,7 @@ struct vds_io { struct page *pages; struct completion event; struct work_struct vds_work; + struct list_head list; char buf[0]; }; @@ -48,6 +49,8 @@ struct vds_io *vds_io_alloc(struct vio_driver_state *vio, void (*func)(struct work_struct *)); void vds_io_free(struct vds_io *io); void vds_io_enq(struct vds_io *io); +void vds_io_wait(struct vds_io *io); +void vds_io_done(struct vds_io *io); void *vds_get(struct vds_port *port, sector_t offset, u64 size); int vds_clear(struct vds_port *port, sector_t offset, u64 size); @@ -63,5 +66,5 @@ int vd_op_get_geom(struct vds_io *io); int vd_op_set_geom(struct vds_io *io); int vd_op_get_efi(struct vds_io *io); int vd_op_set_efi(struct vds_io *io); -int vd_op_flush(struct vio_driver_state *vio); +int vd_op_flush(struct vds_io *io); int vd_op_rw(struct vds_io *io); diff --git a/drivers/block/vds/vds_main.c b/drivers/block/vds/vds_main.c index e9eae09fba8b..7b49bf931685 100644 --- a/drivers/block/vds/vds_main.c +++ b/drivers/block/vds/vds_main.c @@ -1,7 +1,7 @@ /* * vds_main.c: LDOM Virtual Disk Server. * - * Copyright (C) 2014 Oracle. All rights reserved. + * Copyright (C) 2014, 2015 Oracle. All rights reserved. */ #include "vds.h" @@ -24,11 +24,42 @@ MODULE_VERSION(DRV_MOD_VERSION); 1 << VD_OP_GET_EFI | \ 1 << VD_OP_SET_EFI | \ 1 << VD_OP_FLUSH) + /* - * XXX The recommended value is 0 but that creates threads - * which scale with ncpu and because of some apparent - * flow control issues cause scsi timeouts so limit to - * 1 thread for now. + * Request ordering is managed through the following: + * + * - ioq kernel workqueue + * - io_list internal driver queue + * - wait driver waitqueue + * - vds_io_wait() blocks until the caller is at the head of the io_list + * - vds_io_done() deletes the caller from io_list and wakes up blocked threads + * + * A client request is added in interrupt context to the ioq kernel workqueue + * and the io_list internal driver queue. A request is executed at process + * context in a kernel worker thread. The workqueue ensures that requests + * are scheduled for execution in order of arrival. + * + * Once actually executing, an IO request proceeds immediately, e.g. reading + * or writing a disk block, and then calls vds_io_wait() before responding + * to the client in order to serialize client responses regardless of the + * order in which requests are actually completed. + * + * A FLUSH request however calls vds_io_wait() first in order to ensure that + * all proceeding requests are completed and then calls the backend's flush + * method in order to flush any outstanding IO buffers to disk. Note that + * FLUSH request does not enforce any ordering on a potential request that + * might have arrived after the FLUSH request. + * + * A control event (reset or handshake) also calls vds_io_wait() first in + * order to serialize them with respect to each other as well as IO operations. + * For example, reset processing calls the backend's fini method which could + * conceivably block when the backend is closed. The serialization should + * ensure that a following handshake initiates only after the reset is done. + * + * XXX The recommended value for the size of the kernel workqueue is 0 but + * that creates threads which scale with ncpu and because of some apparent + * flow control issues cause intermittent scsi timeouts and LDC aborts so + * limit to 1 thread for now. */ int vds_wq = 1; int vds_dbg; @@ -166,7 +197,7 @@ static struct vio_driver_ops vds_vio_ops = { .handshake_complete = vds_handshake_complete, }; -static void vds_reset(struct vio_driver_state *vio); +static void vds_reset(struct vds_io *io); static void vds_evt_reset(struct vio_driver_state *vio); static int vds_dring_done(struct vds_io *io) @@ -178,6 +209,7 @@ static int vds_dring_done(struct vds_io *io) struct vio_disk_desc *desc; int rv; int idx; + int reset = 1; desc = io->desc_buf; desc->status = io->error; @@ -208,30 +240,30 @@ static int vds_dring_done(struct vds_io *io) rv = ldc_put_dring_entry(vio->lp, io->desc_buf, dr->entry_size, (idx * dr->entry_size), dr->cookies, dr->ncookies); - if (rv != dr->entry_size) - goto reset; - /* - * If we successfully responded to the request (ack or nack), - * then return the actual IO operation return value, otherwise - * reset the connection. - */ - pkt->tag.stype = io->ack; - rv = vio_ldc_send(vio, pkt, sizeof(*pkt)); - if (rv > 0) { - rv = io->error; - vds_io_free(io); - vdsdbg(DATA, "DRING RET %d\n", rv); - return rv; + if (rv == dr->entry_size) { + /* + * If we successfully responded to the request (ack or nack), + * then return the actual IO operation return value, otherwise + * reset the connection. + */ + pkt->tag.stype = io->ack; + rv = vio_ldc_send(vio, pkt, sizeof(*pkt)); + if (rv > 0) { + rv = io->error; + reset = 0; + } + } + + if (reset) { + vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv); + vds_reset(io); + rv = -ECONNRESET; } -reset: - vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv); - vds_reset(vio); - vds_io_free(io); + vdsdbg(DATA, "DRING %s\n", reset ? "RESET" : "OK"); - vdsdbg(DATA, "DRING RESET\n"); - return -ECONNRESET; + return rv; } static int vds_desc_done(struct vds_io *io) @@ -266,13 +298,13 @@ static int vds_desc_done(struct vds_io *io) rv = vio_ldc_send(vio, pkt, io->msglen); if (rv <= 0) { vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv); - vds_reset(vio); + vds_reset(io); rv = -ECONNRESET; } else { rv = io->error; } - vds_io_free(io); + vdsdbg(DATA, "DESC %s\n", rv == -ECONNRESET ? "RESET" : "OK"); return rv; } @@ -330,13 +362,15 @@ static void vds_bh_hs(struct work_struct *work) if (io->flags & VDS_IO_INIT) err = vds_be_init(port); + vds_io_wait(io); + if (!err) err = vio_control_pkt_engine(vio, port->msgbuf); if (err) vdsmsg(err, "%s: handshake failed (%d)\n", port->path, err); - vds_io_free(io); + vds_io_done(io); } /* @@ -384,7 +418,7 @@ static void vds_bh_io(struct work_struct *work) err = vd_op_set_efi(io); break; case VD_OP_FLUSH: - err = vd_op_flush(vio); + err = vd_op_flush(io); break; default: err = -ENOTSUPP; @@ -394,16 +428,21 @@ static void vds_bh_io(struct work_struct *work) if (io->ack == VIO_SUBTYPE_ACK && err != 0 && io->error == 0) io->error = err > 0 ? err : -err; + vds_io_wait(io); + if (port->xfer_mode == VIO_DRING_MODE) (void) vds_dring_done(io); else if (port->xfer_mode == VIO_DESC_MODE) (void) vds_desc_done(io); else BUG(); + + vds_io_done(io); } -static void vds_reset(struct vio_driver_state *vio) +static void vds_reset(struct vds_io *io) { + struct vio_driver_state *vio = io->vio; struct vds_port *port = to_vds_port(vio); unsigned long flags; int err; @@ -412,6 +451,8 @@ static void vds_reset(struct vio_driver_state *vio) BUG_ON(in_interrupt()); + io->flags |= VDS_IO_FINI; + vds_vio_lock(vio, flags); vds_be_fini(port); @@ -434,6 +475,7 @@ static void vds_reset(struct vio_driver_state *vio) done: vds_vio_unlock(vio, flags); + vdsdbg(HS, "%s done\n", port->path); } static void vds_bh_reset(struct work_struct *work) @@ -441,9 +483,10 @@ static void vds_bh_reset(struct work_struct *work) struct vds_io *io = container_of(work, struct vds_io, vds_work); struct vio_driver_state *vio = io->vio; - vds_io_free(io); - vds_reset(vio); + vds_io_wait(io); + vds_reset(io); ldc_enable_hv_intr(vio->lp); + vds_io_done(io); } static int vds_dring_io(struct vio_driver_state *vio) @@ -616,7 +659,6 @@ static void vds_evt_reset(struct vio_driver_state *vio) return; ldc_disable_hv_intr(vio->lp); - io->flags |= VDS_IO_FINI; vds_io_enq(io); } @@ -844,18 +886,15 @@ static int vds_port_probe(struct vio_dev *vdev, const struct vio_device_id *id) port->geom = kzalloc(roundup(sizeof(*port->geom), 8), GFP_KERNEL); port->part = kzalloc(sizeof(*port->part) * VDS_MAXPART, GFP_KERNEL); - /* - * The io and reset work queues are separate because the - * io work queue is flushed during reset which would hang - * if reset itself was scheduled on the io queue. - */ port->ioq = alloc_workqueue("vds_io", WQ_UNBOUND, vds_wq); - port->rtq = alloc_ordered_workqueue("vds_reset", 0); - if (!port->ioq || !port->rtq) { + if (!port->ioq) { err = -ENXIO; goto free_path; } + INIT_LIST_HEAD(&port->io_list); + init_waitqueue_head(&port->wait); + mutex_init(&port->label_lock); dev_set_drvdata(&vdev->dev, port); diff --git a/drivers/block/vds/vds_reg.c b/drivers/block/vds/vds_reg.c index b790f99e6f8a..cf7c157ff52c 100644 --- a/drivers/block/vds/vds_reg.c +++ b/drivers/block/vds/vds_reg.c @@ -1,7 +1,7 @@ /* * vds_reg.c: LDOM Virtual Disk Server. * - * Copyright (C) 2014 Oracle. All rights reserved. + * Copyright (C) 2014, 2015 Oracle. All rights reserved. */ #include "vds.h" @@ -54,9 +54,9 @@ static int vds_reg_rw(struct vds_io *io) off = to_bytes(io->offset); if (io->rw & WRITE) - iosz = file->f_op->write(file, addr, io->size, &off); + iosz = __vfs_write(file, addr, io->size, &off); else - iosz = file->f_op->read(file, addr, io->size, &off); + iosz = __vfs_read(file, addr, io->size, &off); if (iosz != io->size) { vdsmsg(err, "file IO failed: iosz=%ld\n", iosz); @@ -70,7 +70,7 @@ static int vds_reg_flush(struct vds_port *port) { struct file *file = port->be_data; - return vfs_fsync(file, 0); + return file ? vfs_fsync(file, 0) : 0; } struct vds_be_ops vds_reg_ops = { -- 2.50.1