]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
ldoms: VDS panics with regular file backend.
authorBijan Mottahedeh <bijan.mottahedeh@oracle.com>
Mon, 14 Dec 2015 13:21:57 +0000 (05:21 -0800)
committerAllen Pais <allen.pais@oracle.com>
Tue, 15 Dec 2015 12:32:24 +0000 (18:02 +0530)
ldoms: VIO FLUSH not implemented correctly.

Orabug 22365411
Orabug 22365429

Signed-off-by: Bijan Mottahedeh <bijan.mottahedeh@oracle.com>
Reviewed-by: Alexandre Chartre <alexandre.chartre@oracle.com>
(cherry picked from commit 7062cbbda41153660c72ef51920ed2714947593e)

drivers/block/vds/vds.h
drivers/block/vds/vds_blk.c
drivers/block/vds/vds_io.c
drivers/block/vds/vds_io.h
drivers/block/vds/vds_main.c
drivers/block/vds/vds_reg.c

index 28be0e2a26dad288f667e0a85af3414594ecaaa7..ef191275940005ba94512409864cedcad866243b 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * vds.h: LDOM Virtual Disk Server.
  *
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
  */
 
 #include <linux/module.h>
@@ -18,6 +18,7 @@
 #include <linux/namei.h>
 #include <linux/device-mapper.h>
 #include <linux/sysfs.h>
+#include <linux/wait.h>
 
 #include <asm/vio.h>
 #include <asm/ldc.h>
@@ -52,7 +53,8 @@ struct vds_port {
        struct vio_disk_geom    *geom;
        struct vio_disk_vtoc    *vtoc;
        struct workqueue_struct *ioq;
-       struct workqueue_struct *rtq;
+       struct list_head        io_list;
+       wait_queue_head_t       wait;
 };
 
 #define        VDS_PORT_SEQ            0x1
index 42bc4a3214660e31f9818fcda50c485c97994fcf..d71f545f449a02c5425069ddda2428f1a70a3d18 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * vds_blk.c: LDOM Virtual Disk Server.
  *
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
  */
 
 #include "vds.h"
@@ -186,7 +186,7 @@ static int vds_blk_flush(struct vds_port *port)
 {
        struct block_device *bdev = port->be_data;
 
-       return blkdev_issue_flush(bdev, GFP_KERNEL, NULL);
+       return bdev ? blkdev_issue_flush(bdev, GFP_KERNEL, NULL) : 0;
 }
 
 struct vds_be_ops vds_blk_ops = {
index d44bee69b9c0144f884b023c65425e8f79e531eb..7409e6b488f3ebd63b0ea6ab21da3119dbcff537 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * vds_io.c: LDOM Virtual Disk Server.
  *
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
  */
 
 #include "vds.h"
@@ -107,6 +107,8 @@ struct vds_io *vds_io_alloc(struct vio_driver_state *vio,
        if (func)
                INIT_WORK(&io->vds_work, func);
 
+       INIT_LIST_HEAD(&io->list);
+
        return io;
 
 err:
@@ -159,14 +161,75 @@ void vds_io_enq(struct vds_io *io)
        struct vio_driver_state *vio = io->vio;
        struct vds_port *port = to_vds_port(vio);
 
-       vdsdbg(WQ, "cpu=%d\n", smp_processor_id());
+       vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
+              list_first_entry_or_null(&port->io_list, struct vds_io, list));
 
        BUG_ON(!in_interrupt());
 
+       list_add_tail(&io->list, &port->io_list);
+       queue_work(port->ioq, &io->vds_work);
+}
+
+void vds_io_wait(struct vds_io *io)
+{
+       struct vio_driver_state *vio = io->vio;
+       struct vds_port *port = to_vds_port(vio);
+       DEFINE_WAIT(wait);
+       unsigned long flags;
+       struct vds_io *first;
+
+       BUG_ON(in_interrupt());
+
+       vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
+              list_first_entry_or_null(&port->io_list, struct vds_io, list));
+
+       while (1) {
+               prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE);
+
+               vds_vio_lock(vio, flags);
+               first = list_first_entry_or_null(&port->io_list, struct vds_io,
+                       list);
+               vds_vio_unlock(vio, flags);
+
+               /*
+                * If the queue is empty or this is the first request
+                * on the queue, we can proceed.
+                */
+               if (first == NULL || first == io) {
+                       finish_wait(&port->wait, &wait);
+                       break;
+               }
+
+               schedule();
+               finish_wait(&port->wait, &wait);
+       }
+}
+
+void vds_io_done(struct vds_io *io)
+{
+       struct vio_driver_state *vio = io->vio;
+       struct vds_port *port = to_vds_port(vio);
+       unsigned long flags;
+
+       vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
+              list_first_entry_or_null(&port->io_list, struct vds_io, list));
+
+       /*
+        * Dequeue the request.
+        *
+        * If this is a reset, discard the internal IO queue.
+        * This means that any request which was scheduled from the
+        * kernel workqueue AFTER the reset will execute but no response
+        * will be sent to the client.
+        */
+       vds_vio_lock(vio, flags);
+       list_del(&io->list);
        if (io->flags & VDS_IO_FINI)
-               queue_work(port->rtq, &io->vds_work);
+               INIT_LIST_HEAD(&port->io_list);
        else
-               queue_work(port->ioq, &io->vds_work);
+               wake_up(&port->wait);
+       vds_vio_unlock(vio, flags);
+       vds_io_free(io);
 }
 
 static int vds_io_rw(struct vds_io *io)
@@ -440,15 +503,23 @@ done:
        return rv;
 }
 
-int vd_op_flush(struct vio_driver_state *vio)
+int vd_op_flush(struct vds_io *io)
 {
        int rv;
+       struct vio_driver_state *vio = io->vio;
        struct vds_port *port = to_vds_port(vio);
 
-       if (port->be_ops) {
-               flush_workqueue(port->ioq);
+       /*
+        * Wait until this the first request on the internal IO queue.
+        * This will act as a barrier to ensure all preceding IO requests
+        * have completed and responses have been sent to the client.
+        * This does not affect any request queued after the flush.
+        */
+       vdsdbg(FLUSH, "VD_OP_FLUSH\n");
+       vds_io_wait(io);
+       if (port->be_ops)
                rv = port->be_ops->flush(port);
-       else
+       else
                rv = -EIO;
 
        vdsdbg(FLUSH, "VD_OP_FLUSH rv=%d\n", rv);
@@ -613,7 +684,6 @@ done:
 
 void vds_be_fini(struct vds_port *port)
 {
-       flush_workqueue(port->ioq);
        vds_label_fini(port);
        if (port->be_ops) {
                port->be_ops->fini(port);
index 6faf7520227439c5fed634c17fff1e0736fe440a..cd4efd8f5a66d1589079a357d69a18ecda81cb67 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * vds_io.h: LDOM Virtual Disk Server.
  *
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
  */
 
 struct vds_port;
@@ -35,6 +35,7 @@ struct vds_io {
        struct page *pages;
        struct completion event;
        struct work_struct vds_work;
+       struct list_head list;
        char buf[0];
 };
 
@@ -48,6 +49,8 @@ struct vds_io *vds_io_alloc(struct vio_driver_state *vio,
                            void (*func)(struct work_struct *));
 void vds_io_free(struct vds_io *io);
 void vds_io_enq(struct vds_io *io);
+void vds_io_wait(struct vds_io *io);
+void vds_io_done(struct vds_io *io);
 
 void *vds_get(struct vds_port *port, sector_t offset, u64 size);
 int vds_clear(struct vds_port *port, sector_t offset, u64 size);
@@ -63,5 +66,5 @@ int vd_op_get_geom(struct vds_io *io);
 int vd_op_set_geom(struct vds_io *io);
 int vd_op_get_efi(struct vds_io *io);
 int vd_op_set_efi(struct vds_io *io);
-int vd_op_flush(struct vio_driver_state *vio);
+int vd_op_flush(struct vds_io *io);
 int vd_op_rw(struct vds_io *io);
index e9eae09fba8bd73abf9b9e4ecf072dae9e069155..7b49bf931685ba22a886cd509da60da386ed534b 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * vds_main.c: LDOM Virtual Disk Server.
  *
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
  */
 
 #include "vds.h"
@@ -24,11 +24,42 @@ MODULE_VERSION(DRV_MOD_VERSION);
                                 1 << VD_OP_GET_EFI |           \
                                 1 << VD_OP_SET_EFI |           \
                                 1 << VD_OP_FLUSH)
+
 /*
- * XXX The recommended value is 0 but that creates threads
- * which scale with ncpu and because of some apparent
- * flow control issues cause scsi timeouts so limit to
- * 1 thread for now.
+ * Request ordering is managed through the following:
+ *
+ * - ioq kernel workqueue
+ * - io_list internal driver queue
+ * - wait driver waitqueue
+ * - vds_io_wait() blocks until the caller is at the head of the io_list
+ * - vds_io_done() deletes the caller from io_list and wakes up blocked threads
+ *
+ * A client request is added in interrupt context to the ioq kernel workqueue
+ * and the io_list internal driver queue. A request is executed at process
+ * context in a kernel worker thread.  The workqueue ensures that requests
+ * are scheduled for execution in order of arrival.
+ *
+ * Once actually executing, an IO request proceeds immediately, e.g. reading
+ * or writing a disk block, and then calls vds_io_wait() before responding
+ * to the client in order to serialize client responses regardless of the
+ * order in which requests are actually completed.
+ *
+ * A FLUSH request however calls vds_io_wait() first in order to ensure that
+ * all proceeding requests are completed and then calls the backend's flush
+ * method in order to flush any outstanding IO buffers to disk. Note that
+ * FLUSH request does not enforce any ordering on a potential request that
+ * might have arrived after the FLUSH request.
+ *
+ * A control event (reset or handshake) also calls vds_io_wait() first in
+ * order to serialize them with respect to each other as well as IO operations.
+ * For example, reset processing calls the backend's fini method which could
+ * conceivably block when the backend is closed.  The serialization should
+ * ensure that a following handshake initiates only after the reset is done.
+ *
+ * XXX The recommended value for the size of the kernel workqueue is 0 but
+ * that creates threads which scale with ncpu and because of some apparent
+ * flow control issues cause intermittent scsi timeouts and LDC aborts so
+ * limit to 1 thread for now.
  */
 int vds_wq = 1;
 int vds_dbg;
@@ -166,7 +197,7 @@ static struct vio_driver_ops vds_vio_ops = {
        .handshake_complete     = vds_handshake_complete,
 };
 
-static void vds_reset(struct vio_driver_state *vio);
+static void vds_reset(struct vds_io *io);
 static void vds_evt_reset(struct vio_driver_state *vio);
 
 static int vds_dring_done(struct vds_io *io)
@@ -178,6 +209,7 @@ static int vds_dring_done(struct vds_io *io)
        struct vio_disk_desc *desc;
        int rv;
        int idx;
+       int reset = 1;
 
        desc = io->desc_buf;
        desc->status = io->error;
@@ -208,30 +240,30 @@ static int vds_dring_done(struct vds_io *io)
        rv = ldc_put_dring_entry(vio->lp, io->desc_buf, dr->entry_size,
                                  (idx * dr->entry_size), dr->cookies,
                                  dr->ncookies);
-       if (rv != dr->entry_size)
-               goto reset;
 
-       /*
-        * If we successfully responded to the request (ack or nack),
-        * then return the actual IO operation return value, otherwise
-        * reset the connection.
-        */
-       pkt->tag.stype = io->ack;
-       rv = vio_ldc_send(vio, pkt, sizeof(*pkt));
-       if (rv > 0) {
-               rv = io->error;
-               vds_io_free(io);
-               vdsdbg(DATA, "DRING RET %d\n", rv);
-               return rv;
+       if (rv == dr->entry_size) {
+               /*
+                * If we successfully responded to the request (ack or nack),
+                * then return the actual IO operation return value, otherwise
+                * reset the connection.
+                */
+               pkt->tag.stype = io->ack;
+               rv = vio_ldc_send(vio, pkt, sizeof(*pkt));
+               if (rv > 0) {
+                       rv = io->error;
+                       reset = 0;
+               }
+       }
+
+       if (reset) {
+               vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv);
+               vds_reset(io);
+               rv = -ECONNRESET;
        }
 
-reset:
-       vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv);
-       vds_reset(vio);
-       vds_io_free(io);
+       vdsdbg(DATA, "DRING %s\n", reset ? "RESET" : "OK");
 
-       vdsdbg(DATA, "DRING RESET\n");
-       return -ECONNRESET;
+       return rv;
 }
 
 static int vds_desc_done(struct vds_io *io)
@@ -266,13 +298,13 @@ static int vds_desc_done(struct vds_io *io)
        rv = vio_ldc_send(vio, pkt, io->msglen);
        if (rv <= 0) {
                vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv);
-               vds_reset(vio);
+               vds_reset(io);
                rv = -ECONNRESET;
        } else {
                rv = io->error;
        }
 
-       vds_io_free(io);
+       vdsdbg(DATA, "DESC %s\n", rv == -ECONNRESET ? "RESET" : "OK");
        return rv;
 }
 
@@ -330,13 +362,15 @@ static void vds_bh_hs(struct work_struct *work)
        if (io->flags & VDS_IO_INIT)
                err = vds_be_init(port);
 
+       vds_io_wait(io);
+
        if (!err)
                err = vio_control_pkt_engine(vio, port->msgbuf);
 
        if (err)
                vdsmsg(err, "%s: handshake failed (%d)\n", port->path, err);
 
-       vds_io_free(io);
+       vds_io_done(io);
 }
 
 /*
@@ -384,7 +418,7 @@ static void vds_bh_io(struct work_struct *work)
                err = vd_op_set_efi(io);
                break;
        case VD_OP_FLUSH:
-               err = vd_op_flush(vio);
+               err = vd_op_flush(io);
                break;
        default:
                err = -ENOTSUPP;
@@ -394,16 +428,21 @@ static void vds_bh_io(struct work_struct *work)
        if (io->ack == VIO_SUBTYPE_ACK && err != 0 && io->error == 0)
                io->error = err > 0 ? err : -err;
 
+       vds_io_wait(io);
+
        if (port->xfer_mode == VIO_DRING_MODE)
                (void) vds_dring_done(io);
        else if (port->xfer_mode == VIO_DESC_MODE)
                (void) vds_desc_done(io);
        else
                BUG();
+
+       vds_io_done(io);
 }
 
-static void vds_reset(struct vio_driver_state *vio)
+static void vds_reset(struct vds_io *io)
 {
+       struct vio_driver_state *vio = io->vio;
        struct vds_port *port = to_vds_port(vio);
        unsigned long flags;
        int err;
@@ -412,6 +451,8 @@ static void vds_reset(struct vio_driver_state *vio)
 
        BUG_ON(in_interrupt());
 
+       io->flags |= VDS_IO_FINI;
+
        vds_vio_lock(vio, flags);
        vds_be_fini(port);
 
@@ -434,6 +475,7 @@ static void vds_reset(struct vio_driver_state *vio)
 
 done:
        vds_vio_unlock(vio, flags);
+       vdsdbg(HS, "%s done\n", port->path);
 }
 
 static void vds_bh_reset(struct work_struct *work)
@@ -441,9 +483,10 @@ static void vds_bh_reset(struct work_struct *work)
        struct vds_io *io = container_of(work, struct vds_io, vds_work);
        struct vio_driver_state *vio = io->vio;
 
-       vds_io_free(io);
-       vds_reset(vio);
+       vds_io_wait(io);
+       vds_reset(io);
        ldc_enable_hv_intr(vio->lp);
+       vds_io_done(io);
 }
 
 static int vds_dring_io(struct vio_driver_state *vio)
@@ -616,7 +659,6 @@ static void vds_evt_reset(struct vio_driver_state *vio)
                return;
 
        ldc_disable_hv_intr(vio->lp);
-       io->flags |= VDS_IO_FINI;
 
        vds_io_enq(io);
 }
@@ -844,18 +886,15 @@ static int vds_port_probe(struct vio_dev *vdev, const struct vio_device_id *id)
        port->geom = kzalloc(roundup(sizeof(*port->geom), 8), GFP_KERNEL);
        port->part = kzalloc(sizeof(*port->part) * VDS_MAXPART, GFP_KERNEL);
 
-       /*
-        * The io and reset work queues are separate because the
-        * io work queue is flushed during reset which would hang
-        * if reset itself was scheduled on the io queue.
-        */
        port->ioq = alloc_workqueue("vds_io", WQ_UNBOUND, vds_wq);
-       port->rtq = alloc_ordered_workqueue("vds_reset", 0);
-       if (!port->ioq || !port->rtq) {
+       if (!port->ioq) {
                err = -ENXIO;
                goto free_path;
        }
 
+       INIT_LIST_HEAD(&port->io_list);
+       init_waitqueue_head(&port->wait);
+
        mutex_init(&port->label_lock);
 
        dev_set_drvdata(&vdev->dev, port);
index b790f99e6f8a819b2329ab80826624c1f7ba0c1a..cf7c157ff52c0588910744eca1487f8f12da9458 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * vds_reg.c: LDOM Virtual Disk Server.
  *
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
  */
 
 #include "vds.h"
@@ -54,9 +54,9 @@ static int vds_reg_rw(struct vds_io *io)
        off = to_bytes(io->offset);
 
        if (io->rw & WRITE)
-               iosz = file->f_op->write(file, addr, io->size, &off);
+               iosz = __vfs_write(file, addr, io->size, &off);
        else
-               iosz = file->f_op->read(file, addr, io->size, &off);
+               iosz = __vfs_read(file, addr, io->size, &off);
 
        if (iosz != io->size) {
                vdsmsg(err, "file IO failed: iosz=%ld\n", iosz);
@@ -70,7 +70,7 @@ static int vds_reg_flush(struct vds_port *port)
 {
        struct file *file = port->be_data;
 
-       return vfs_fsync(file, 0);
+       return file ? vfs_fsync(file, 0) : 0;
 }
 
 struct vds_be_ops vds_reg_ops = {