/*
* vds.h: LDOM Virtual Disk Server.
*
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
*/
#include <linux/module.h>
#include <linux/namei.h>
#include <linux/device-mapper.h>
#include <linux/sysfs.h>
+#include <linux/wait.h>
#include <asm/vio.h>
#include <asm/ldc.h>
struct vio_disk_geom *geom;
struct vio_disk_vtoc *vtoc;
struct workqueue_struct *ioq;
- struct workqueue_struct *rtq;
+ struct list_head io_list;
+ wait_queue_head_t wait;
};
#define VDS_PORT_SEQ 0x1
/*
* vds_blk.c: LDOM Virtual Disk Server.
*
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
*/
#include "vds.h"
{
struct block_device *bdev = port->be_data;
- return blkdev_issue_flush(bdev, GFP_KERNEL, NULL);
+ return bdev ? blkdev_issue_flush(bdev, GFP_KERNEL, NULL) : 0;
}
struct vds_be_ops vds_blk_ops = {
/*
* vds_io.c: LDOM Virtual Disk Server.
*
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
*/
#include "vds.h"
if (func)
INIT_WORK(&io->vds_work, func);
+ INIT_LIST_HEAD(&io->list);
+
return io;
err:
struct vio_driver_state *vio = io->vio;
struct vds_port *port = to_vds_port(vio);
- vdsdbg(WQ, "cpu=%d\n", smp_processor_id());
+ vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
+ list_first_entry_or_null(&port->io_list, struct vds_io, list));
BUG_ON(!in_interrupt());
+ list_add_tail(&io->list, &port->io_list);
+ queue_work(port->ioq, &io->vds_work);
+}
+
+void vds_io_wait(struct vds_io *io)
+{
+ struct vio_driver_state *vio = io->vio;
+ struct vds_port *port = to_vds_port(vio);
+ DEFINE_WAIT(wait);
+ unsigned long flags;
+ struct vds_io *first;
+
+ BUG_ON(in_interrupt());
+
+ vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
+ list_first_entry_or_null(&port->io_list, struct vds_io, list));
+
+ while (1) {
+ prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE);
+
+ vds_vio_lock(vio, flags);
+ first = list_first_entry_or_null(&port->io_list, struct vds_io,
+ list);
+ vds_vio_unlock(vio, flags);
+
+ /*
+ * If the queue is empty or this is the first request
+ * on the queue, we can proceed.
+ */
+ if (first == NULL || first == io) {
+ finish_wait(&port->wait, &wait);
+ break;
+ }
+
+ schedule();
+ finish_wait(&port->wait, &wait);
+ }
+}
+
+void vds_io_done(struct vds_io *io)
+{
+ struct vio_driver_state *vio = io->vio;
+ struct vds_port *port = to_vds_port(vio);
+ unsigned long flags;
+
+ vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
+ list_first_entry_or_null(&port->io_list, struct vds_io, list));
+
+ /*
+ * Dequeue the request.
+ *
+ * If this is a reset, discard the internal IO queue.
+ * This means that any request which was scheduled from the
+ * kernel workqueue AFTER the reset will execute but no response
+ * will be sent to the client.
+ */
+ vds_vio_lock(vio, flags);
+ list_del(&io->list);
if (io->flags & VDS_IO_FINI)
- queue_work(port->rtq, &io->vds_work);
+ INIT_LIST_HEAD(&port->io_list);
else
- queue_work(port->ioq, &io->vds_work);
+ wake_up(&port->wait);
+ vds_vio_unlock(vio, flags);
+ vds_io_free(io);
}
static int vds_io_rw(struct vds_io *io)
return rv;
}
-int vd_op_flush(struct vio_driver_state *vio)
+int vd_op_flush(struct vds_io *io)
{
int rv;
+ struct vio_driver_state *vio = io->vio;
struct vds_port *port = to_vds_port(vio);
- if (port->be_ops) {
- flush_workqueue(port->ioq);
+ /*
+ * Wait until this the first request on the internal IO queue.
+ * This will act as a barrier to ensure all preceding IO requests
+ * have completed and responses have been sent to the client.
+ * This does not affect any request queued after the flush.
+ */
+ vdsdbg(FLUSH, "VD_OP_FLUSH\n");
+ vds_io_wait(io);
+ if (port->be_ops)
rv = port->be_ops->flush(port);
- } else
+ else
rv = -EIO;
vdsdbg(FLUSH, "VD_OP_FLUSH rv=%d\n", rv);
void vds_be_fini(struct vds_port *port)
{
- flush_workqueue(port->ioq);
vds_label_fini(port);
if (port->be_ops) {
port->be_ops->fini(port);
/*
* vds_io.h: LDOM Virtual Disk Server.
*
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
*/
struct vds_port;
struct page *pages;
struct completion event;
struct work_struct vds_work;
+ struct list_head list;
char buf[0];
};
void (*func)(struct work_struct *));
void vds_io_free(struct vds_io *io);
void vds_io_enq(struct vds_io *io);
+void vds_io_wait(struct vds_io *io);
+void vds_io_done(struct vds_io *io);
void *vds_get(struct vds_port *port, sector_t offset, u64 size);
int vds_clear(struct vds_port *port, sector_t offset, u64 size);
int vd_op_set_geom(struct vds_io *io);
int vd_op_get_efi(struct vds_io *io);
int vd_op_set_efi(struct vds_io *io);
-int vd_op_flush(struct vio_driver_state *vio);
+int vd_op_flush(struct vds_io *io);
int vd_op_rw(struct vds_io *io);
/*
* vds_main.c: LDOM Virtual Disk Server.
*
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
*/
#include "vds.h"
1 << VD_OP_GET_EFI | \
1 << VD_OP_SET_EFI | \
1 << VD_OP_FLUSH)
+
/*
- * XXX The recommended value is 0 but that creates threads
- * which scale with ncpu and because of some apparent
- * flow control issues cause scsi timeouts so limit to
- * 1 thread for now.
+ * Request ordering is managed through the following:
+ *
+ * - ioq kernel workqueue
+ * - io_list internal driver queue
+ * - wait driver waitqueue
+ * - vds_io_wait() blocks until the caller is at the head of the io_list
+ * - vds_io_done() deletes the caller from io_list and wakes up blocked threads
+ *
+ * A client request is added in interrupt context to the ioq kernel workqueue
+ * and the io_list internal driver queue. A request is executed at process
+ * context in a kernel worker thread. The workqueue ensures that requests
+ * are scheduled for execution in order of arrival.
+ *
+ * Once actually executing, an IO request proceeds immediately, e.g. reading
+ * or writing a disk block, and then calls vds_io_wait() before responding
+ * to the client in order to serialize client responses regardless of the
+ * order in which requests are actually completed.
+ *
+ * A FLUSH request however calls vds_io_wait() first in order to ensure that
+ * all proceeding requests are completed and then calls the backend's flush
+ * method in order to flush any outstanding IO buffers to disk. Note that
+ * FLUSH request does not enforce any ordering on a potential request that
+ * might have arrived after the FLUSH request.
+ *
+ * A control event (reset or handshake) also calls vds_io_wait() first in
+ * order to serialize them with respect to each other as well as IO operations.
+ * For example, reset processing calls the backend's fini method which could
+ * conceivably block when the backend is closed. The serialization should
+ * ensure that a following handshake initiates only after the reset is done.
+ *
+ * XXX The recommended value for the size of the kernel workqueue is 0 but
+ * that creates threads which scale with ncpu and because of some apparent
+ * flow control issues cause intermittent scsi timeouts and LDC aborts so
+ * limit to 1 thread for now.
*/
int vds_wq = 1;
int vds_dbg;
.handshake_complete = vds_handshake_complete,
};
-static void vds_reset(struct vio_driver_state *vio);
+static void vds_reset(struct vds_io *io);
static void vds_evt_reset(struct vio_driver_state *vio);
static int vds_dring_done(struct vds_io *io)
struct vio_disk_desc *desc;
int rv;
int idx;
+ int reset = 1;
desc = io->desc_buf;
desc->status = io->error;
rv = ldc_put_dring_entry(vio->lp, io->desc_buf, dr->entry_size,
(idx * dr->entry_size), dr->cookies,
dr->ncookies);
- if (rv != dr->entry_size)
- goto reset;
- /*
- * If we successfully responded to the request (ack or nack),
- * then return the actual IO operation return value, otherwise
- * reset the connection.
- */
- pkt->tag.stype = io->ack;
- rv = vio_ldc_send(vio, pkt, sizeof(*pkt));
- if (rv > 0) {
- rv = io->error;
- vds_io_free(io);
- vdsdbg(DATA, "DRING RET %d\n", rv);
- return rv;
+ if (rv == dr->entry_size) {
+ /*
+ * If we successfully responded to the request (ack or nack),
+ * then return the actual IO operation return value, otherwise
+ * reset the connection.
+ */
+ pkt->tag.stype = io->ack;
+ rv = vio_ldc_send(vio, pkt, sizeof(*pkt));
+ if (rv > 0) {
+ rv = io->error;
+ reset = 0;
+ }
+ }
+
+ if (reset) {
+ vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv);
+ vds_reset(io);
+ rv = -ECONNRESET;
}
-reset:
- vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv);
- vds_reset(vio);
- vds_io_free(io);
+ vdsdbg(DATA, "DRING %s\n", reset ? "RESET" : "OK");
- vdsdbg(DATA, "DRING RESET\n");
- return -ECONNRESET;
+ return rv;
}
static int vds_desc_done(struct vds_io *io)
rv = vio_ldc_send(vio, pkt, io->msglen);
if (rv <= 0) {
vdsmsg(err, "Reset VDS LDC rv[%d]\n", rv);
- vds_reset(vio);
+ vds_reset(io);
rv = -ECONNRESET;
} else {
rv = io->error;
}
- vds_io_free(io);
+ vdsdbg(DATA, "DESC %s\n", rv == -ECONNRESET ? "RESET" : "OK");
return rv;
}
if (io->flags & VDS_IO_INIT)
err = vds_be_init(port);
+ vds_io_wait(io);
+
if (!err)
err = vio_control_pkt_engine(vio, port->msgbuf);
if (err)
vdsmsg(err, "%s: handshake failed (%d)\n", port->path, err);
- vds_io_free(io);
+ vds_io_done(io);
}
/*
err = vd_op_set_efi(io);
break;
case VD_OP_FLUSH:
- err = vd_op_flush(vio);
+ err = vd_op_flush(io);
break;
default:
err = -ENOTSUPP;
if (io->ack == VIO_SUBTYPE_ACK && err != 0 && io->error == 0)
io->error = err > 0 ? err : -err;
+ vds_io_wait(io);
+
if (port->xfer_mode == VIO_DRING_MODE)
(void) vds_dring_done(io);
else if (port->xfer_mode == VIO_DESC_MODE)
(void) vds_desc_done(io);
else
BUG();
+
+ vds_io_done(io);
}
-static void vds_reset(struct vio_driver_state *vio)
+static void vds_reset(struct vds_io *io)
{
+ struct vio_driver_state *vio = io->vio;
struct vds_port *port = to_vds_port(vio);
unsigned long flags;
int err;
BUG_ON(in_interrupt());
+ io->flags |= VDS_IO_FINI;
+
vds_vio_lock(vio, flags);
vds_be_fini(port);
done:
vds_vio_unlock(vio, flags);
+ vdsdbg(HS, "%s done\n", port->path);
}
static void vds_bh_reset(struct work_struct *work)
struct vds_io *io = container_of(work, struct vds_io, vds_work);
struct vio_driver_state *vio = io->vio;
- vds_io_free(io);
- vds_reset(vio);
+ vds_io_wait(io);
+ vds_reset(io);
ldc_enable_hv_intr(vio->lp);
+ vds_io_done(io);
}
static int vds_dring_io(struct vio_driver_state *vio)
return;
ldc_disable_hv_intr(vio->lp);
- io->flags |= VDS_IO_FINI;
vds_io_enq(io);
}
port->geom = kzalloc(roundup(sizeof(*port->geom), 8), GFP_KERNEL);
port->part = kzalloc(sizeof(*port->part) * VDS_MAXPART, GFP_KERNEL);
- /*
- * The io and reset work queues are separate because the
- * io work queue is flushed during reset which would hang
- * if reset itself was scheduled on the io queue.
- */
port->ioq = alloc_workqueue("vds_io", WQ_UNBOUND, vds_wq);
- port->rtq = alloc_ordered_workqueue("vds_reset", 0);
- if (!port->ioq || !port->rtq) {
+ if (!port->ioq) {
err = -ENXIO;
goto free_path;
}
+ INIT_LIST_HEAD(&port->io_list);
+ init_waitqueue_head(&port->wait);
+
mutex_init(&port->label_lock);
dev_set_drvdata(&vdev->dev, port);
/*
* vds_reg.c: LDOM Virtual Disk Server.
*
- * Copyright (C) 2014 Oracle. All rights reserved.
+ * Copyright (C) 2014, 2015 Oracle. All rights reserved.
*/
#include "vds.h"
off = to_bytes(io->offset);
if (io->rw & WRITE)
- iosz = file->f_op->write(file, addr, io->size, &off);
+ iosz = __vfs_write(file, addr, io->size, &off);
else
- iosz = file->f_op->read(file, addr, io->size, &off);
+ iosz = __vfs_read(file, addr, io->size, &off);
if (iosz != io->size) {
vdsmsg(err, "file IO failed: iosz=%ld\n", iosz);
{
struct file *file = port->be_data;
- return vfs_fsync(file, 0);
+ return file ? vfs_fsync(file, 0) : 0;
}
struct vds_be_ops vds_reg_ops = {