#include <linux/cdrom.h>
#include <linux/slab.h>
#include <linux/spinlock.h>
+#include <linux/bitmap.h>
#include <linux/completion.h>
#include <linux/delay.h>
#include <linux/init.h>
#define DRV_MODULE_NAME "sunvdc"
#define PFX DRV_MODULE_NAME ": "
-#define DRV_MODULE_VERSION "1.2"
-#define DRV_MODULE_RELDATE "November 24, 2014"
+#define DRV_MODULE_VERSION "1.3"
+#define DRV_MODULE_RELDATE "September 24, 2016"
static char version[] =
DRV_MODULE_NAME ".c:v" DRV_MODULE_VERSION " (" DRV_MODULE_RELDATE ")\n";
static void vdc_ldc_reset(struct vdc_port *port);
static void vdc_ldc_reset_work(struct work_struct *work);
static void vdc_ldc_reset_timer(unsigned long _arg);
+static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx);
+static inline void vdc_desc_set_state(struct vio_disk_desc *, int);
static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio)
{
/* Ordered from largest major to lowest */
static struct vio_version vdc_versions[] = {
+ { .major = 1, .minor = 3 },
+ { .major = 1, .minor = 2 },
{ .major = 1, .minor = 1 },
{ .major = 1, .minor = 0 },
};
static inline u32 vdc_tx_dring_avail(struct vio_dring_state *dr)
{
- return vio_dring_avail(dr, VDC_TX_RING_SIZE);
+ return VDC_TX_RING_SIZE - bitmap_weight(dr->txmap, dr->nr_txmap);
}
static int vdc_getgeo(struct block_device *bdev, struct hd_geometry *geo)
}
}
-static void vdc_end_special(struct vdc_port *port, struct vio_disk_desc *desc)
+static void vdc_end_special(struct vdc_port *port, int err)
{
- int err = desc->status;
-
vdc_finish(&port->vio, -err, WAITING_FOR_GEN_CMD);
}
static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr,
- unsigned int index)
+ unsigned int index, int err)
{
struct vio_disk_desc *desc = vio_dring_entry(dr, index);
- struct vdc_req_entry *rqe = &port->rq_arr[index];
+ struct vio_driver_state *vio = &port->vio;
struct request *req;
- if (unlikely(desc->hdr.state != VIO_DESC_DONE))
- return;
+ assert_spin_locked(&vio->lock);
- ldc_unmap(port->vio.lp, desc->cookies, desc->ncookies);
- desc->hdr.state = VIO_DESC_FREE;
- dr->cons = vio_dring_next(dr, index);
+ if (err)
+ vdc_desc_set_state(desc, VIO_DESC_DONE);
+ else if (unlikely(desc->hdr.state != VIO_DESC_DONE)) {
+ pr_err("%s idx=%u err=%d state=%d\n",
+ __func__, index, err, desc->hdr.state);
+ return;
+ } else
+ err = desc->status;
- req = rqe->req;
+ req = vdc_desc_put(port, index);
if (req == NULL) {
- vdc_end_special(port, desc);
+ vdc_end_special(port, err);
return;
}
- rqe->req = NULL;
-
__blk_end_request(req, (desc->status ? -EIO : 0), desc->size);
vdc_blk_queue_start(port);
pkt->start_idx >= VDC_TX_RING_SIZE))
return 0;
- vdc_end_one(port, dr, pkt->start_idx);
+ vdc_end_one(port, dr, pkt->start_idx, 0);
return 0;
}
spin_unlock_irqrestore(&vio->lock, flags);
}
-static int __vdc_tx_trigger(struct vdc_port *port)
+static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx)
{
- struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+ struct vio_driver_state *vio = &port->vio;
+ struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
struct vio_dring_data hdr = {
.tag = {
.type = VIO_TYPE_DATA,
.stype = VIO_SUBTYPE_INFO,
.stype_env = VIO_DRING_DATA,
- .sid = vio_send_sid(&port->vio),
+ .sid = vio_send_sid(vio),
},
.dring_ident = dr->ident,
- .start_idx = dr->prod,
- .end_idx = dr->prod,
+ .start_idx = idx,
+ .end_idx = idx,
};
int err, delay;
- hdr.seq = dr->snd_nxt;
delay = 1;
do {
- err = vio_ldc_send(&port->vio, &hdr, sizeof(hdr));
+ hdr.seq = dr->snd_nxt;
+ err = vio_ldc_send(vio, &hdr, sizeof(hdr));
if (err > 0) {
dr->snd_nxt++;
+ port->req_id++;
break;
}
+
udelay(delay);
if ((delay <<= 1) > 128)
delay = 128;
} while (err == -EAGAIN);
- if (err == -ENOTCONN) {
- printk(KERN_ERR PFX "vio_ldc_send() failure, err=%d.\n", err);
- vdc_ldc_reset(port);
- }
+ if (err < 0)
+ pr_err(PFX "vio_ldc_send() failed, idx=%d err=%d.\n", idx, err);
+
return err;
}
-static int __send_request(struct request *req)
+static struct vio_disk_desc *vdc_desc_get(struct vdc_port *port,
+ struct request *req,
+ unsigned int *idxp)
{
- struct vdc_port *port = req->rq_disk->private_data;
- struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
- struct scatterlist sg[port->ring_cookies];
+ unsigned int idx;
+ struct vio_disk_desc *desc = NULL;
+ struct vio_driver_state *vio = &port->vio;
+ struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+
+ assert_spin_locked(&vio->lock);
+
+ idx = find_first_zero_bit(dr->txmap, dr->nr_txmap);
+ if (idx < VDC_TX_RING_SIZE) {
+ bitmap_set(dr->txmap, idx, 1);
+ desc = dr->base + (dr->entry_size * idx);
+ if (req) {
+ BUG_ON(port->rq_arr[idx].req);
+ port->rq_arr[idx].req = req;
+ }
+ *idxp = idx;
+ }
+
+ return desc;
+}
+
+static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx)
+{
+ struct vio_driver_state *vio = &port->vio;
+ struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+ struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
struct vdc_req_entry *rqe;
- struct vio_disk_desc *desc;
+ struct request *req;
+
+ assert_spin_locked(&vio->lock);
+
+ ldc_unmap(vio->lp, desc->cookies, desc->ncookies);
+
+ bitmap_clear(dr->txmap, idx, 1);
+ vdc_desc_set_state(desc, VIO_DESC_FREE);
+
+ rqe = &port->rq_arr[idx];
+ req = rqe->req;
+ rqe->req = NULL;
+
+ return req;
+}
+
+static inline void vdc_desc_set_state(struct vio_disk_desc *desc, int state)
+{
+ desc->hdr.state = state;
+ /*
+ * This has to be a non-SMP write barrier because we are writing
+ * to memory which is shared with the peer LDOM.
+ */
+ wmb();
+}
+
+static void __create_flush_desc(struct vdc_port *port,
+ struct vio_disk_desc *desc)
+{
+ memset(desc, 0, sizeof(struct vio_disk_desc));
+ desc->hdr.ack = VIO_ACK_ENABLE;
+ desc->req_id = port->req_id;
+ desc->operation = VD_OP_FLUSH;
+}
+
+static int __create_rw_desc(struct vdc_port *port, struct request *req,
+ struct vio_disk_desc *desc)
+{
+ struct scatterlist sg[port->ring_cookies];
unsigned int map_perm;
int nsg, err, i;
u64 len;
sg_init_table(sg, port->ring_cookies);
nsg = blk_rq_map_sg(req->q, req, sg);
+ if (!nsg) {
+ pr_err(PFX "blk_rq_map_sg() failed, nsg=%d.\n", nsg);
+ return -EIO;
+ }
- len = 0;
- for (i = 0; i < nsg; i++)
- len += sg[i].length;
+ memset(desc, 0, sizeof(struct vio_disk_desc));
- desc = vio_dring_cur(dr);
+ err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies,
+ port->ring_cookies, map_perm);
- err = ldc_map_sg(port->vio.lp, sg, nsg,
- desc->cookies, port->ring_cookies,
- map_perm);
- if (err < 0) {
- printk(KERN_ERR PFX "ldc_map_sg() failure, err=%d.\n", err);
+ if (err <= 0) {
+ pr_err(PFX "ldc_map_sg() failed, err=%d.\n", err);
return err;
}
- rqe = &port->rq_arr[dr->prod];
- rqe->req = req;
+ len = 0;
+ for (i = 0; i < nsg; i++)
+ len += sg[i].length;
desc->hdr.ack = VIO_ACK_ENABLE;
desc->req_id = port->req_id;
desc->operation = op;
- if (port->vdisk_type == VD_DISK_TYPE_DISK) {
+ if (port->vdisk_type == VD_DISK_TYPE_DISK)
desc->slice = 0xff;
- } else {
+ else
desc->slice = 0;
- }
desc->status = ~0;
desc->offset = (blk_rq_pos(req) << 9) / port->vdisk_block_size;
desc->size = len;
desc->ncookies = err;
- /* This has to be a non-SMP write barrier because we are writing
- * to memory which is shared with the peer LDOM.
- */
- wmb();
- desc->hdr.state = VIO_DESC_READY;
+ return 0;
+}
- err = __vdc_tx_trigger(port);
- if (err < 0) {
- printk(KERN_ERR PFX "vdc_tx_trigger() failure, err=%d\n", err);
- } else {
- port->req_id++;
- dr->prod = vio_dring_next(dr, dr->prod);
+static int __send_request(struct vdc_port *port, unsigned int idx)
+{
+ struct vio_driver_state *vio = &port->vio;
+ struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+ struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
+ int err;
+
+ vdc_desc_set_state(desc, VIO_DESC_READY);
+
+ while (1) {
+ err = __vdc_tx_trigger(port, idx);
+
+ if (err == -ECONNRESET || err == -ENOTCONN) {
+ vdc_ldc_reset(port);
+ pr_info(PFX "%s retry, idx=%d err=%d\n",
+ __func__, idx, err);
+ } else if (err < 0) {
+ pr_err(PFX "%s error, idx=%d err=%d\n",
+ __func__, idx, err);
+ } else
+ break;
}
return err;
static void do_vdc_request(struct request_queue *rq)
{
+ struct vdc_port *port = NULL;
+ struct vio_dring_state *dr = NULL;
struct request *req;
+ unsigned int idx = 0;
+ int err = 0;
while ((req = blk_peek_request(rq)) != NULL) {
- struct vdc_port *port;
- struct vio_dring_state *dr;
+ struct vio_disk_desc *desc;
port = req->rq_disk->private_data;
dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+
if (unlikely(vdc_tx_dring_avail(dr) < 1))
goto wait;
+ desc = vdc_desc_get(port, req, &idx);
+
+ /* Note that REQ_FLUSH requests with a payload are
+ * automatically turned into a sequence of an empty
+ * REQ_FLUSH request followed by the actual i/o by
+ * the block layer.
+ */
+ if (req->cmd_flags & REQ_FLUSH) {
+ if (req->bio != NULL)
+ pr_err(PFX "non-empty REQ_FLUSH, lost I/O\n");
+ __create_flush_desc(port, desc);
+ } else
+ err = __create_rw_desc(port, req, desc);
+
+ if (err)
+ goto wait;
+
blk_start_request(req);
- if (__send_request(req) < 0) {
+ err = __send_request(port, idx);
+ if (err < 0) {
blk_requeue_request(rq, req);
wait:
/* Avoid pointless unplugs. */
break;
}
}
+
+ if (err < 0 &&
+ err != -ENOMEM && err != -ECONNRESET && err != -ENOTCONN)
+ vdc_end_one(port, dr, idx, err);
}
static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
{
- struct vio_dring_state *dr;
struct vio_completion comp;
struct vio_disk_desc *desc;
- unsigned int map_perm;
unsigned long flags;
+ unsigned int map_perm;
+ unsigned int idx;
int op_len, err;
void *req_buf;
spin_lock_irqsave(&port->vio.lock, flags);
- dr = &port->vio.drings[VIO_DRIVER_TX_RING];
-
- /* XXX If we want to use this code generically we have to
- * XXX handle TX ring exhaustion etc.
- */
- desc = vio_dring_cur(dr);
+ desc = vdc_desc_get(port, NULL, &idx);
+ if (!desc) {
+ err = -ENOMEM;
+ goto done;
+ }
err = ldc_map_single(port->vio.lp, req_buf, op_len,
desc->cookies, port->ring_cookies,
map_perm);
- if (err < 0) {
- spin_unlock_irqrestore(&port->vio.lock, flags);
- kfree(req_buf);
- return err;
- }
+ if (err < 0)
+ goto done;
init_completion(&comp.com);
comp.waiting_for = WAITING_FOR_GEN_CMD;
desc->size = op_len;
desc->ncookies = err;
- /* This has to be a non-SMP write barrier because we are writing
- * to memory which is shared with the peer LDOM.
- */
- wmb();
- desc->hdr.state = VIO_DESC_READY;
-
- err = __vdc_tx_trigger(port);
+ err = __send_request(port, idx);
if (err >= 0) {
- port->req_id++;
- dr->prod = vio_dring_next(dr, dr->prod);
spin_unlock_irqrestore(&port->vio.lock, flags);
-
wait_for_completion(&comp.com);
err = comp.err;
+ spin_lock_irqsave(&port->vio.lock, flags);
} else {
port->vio.cmp = NULL;
- spin_unlock_irqrestore(&port->vio.lock, flags);
+ goto done;
}
if (map_perm & LDC_MAP_W)
memcpy(buf, req_buf, len);
+done:
+ (void) vdc_desc_put(port, idx);
+ spin_unlock_irqrestore(&port->vio.lock, flags);
kfree(req_buf);
return err;
}
+static int vio_txring_alloc(struct vio_dring_state *dr, unsigned int nr_tx)
+{
+ unsigned int sz;
+
+ sz = BITS_TO_LONGS(nr_tx) * sizeof(unsigned long);
+ dr->txmap = kzalloc(sz, GFP_KERNEL);
+
+ if (!dr->txmap)
+ return -ENOMEM;
+
+ dr->nr_txmap = nr_tx;
+ return 0;
+}
+
static int vdc_alloc_tx_ring(struct vdc_port *port)
{
struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
unsigned long len, entry_size;
int ncookies;
void *dring;
+ int ret;
entry_size = sizeof(struct vio_disk_desc) +
(sizeof(struct ldc_trans_cookie) * port->ring_cookies);
len = (VDC_TX_RING_SIZE * entry_size);
+ ret = vio_txring_alloc(dr, VDC_TX_RING_SIZE);
+ if (ret)
+ return ret;
+
ncookies = VIO_MAX_RING_COOKIES;
dring = ldc_alloc_exp_dring(port->vio.lp, len,
dr->cookies, &ncookies,
dr->base = dring;
dr->entry_size = entry_size;
dr->num_entries = VDC_TX_RING_SIZE;
- dr->prod = dr->cons = 0;
dr->pending = VDC_TX_RING_SIZE;
dr->ncookies = ncookies;
blk_queue_segment_boundary(q, PAGE_SIZE - 1);
blk_queue_max_segment_size(q, PAGE_SIZE);
+ /* vds may be a device with volatile caching and in protocol 1.3
+ * can perform out of order completion. REQ_FLUSH/REQ_FUA are
+ * used to signal completion barriers.
+ * REQ_FUA is turned into a following REQ_FLUSH by block layer
+ * if not supported directly.
+ */
+ blk_queue_flush(q, REQ_FLUSH);
+
blk_queue_max_segments(q, port->ring_cookies);
blk_queue_max_hw_sectors(q, port->max_xfer_size);
g->major = vdc_major;
static struct ldc_channel_config vdc_ldc_cfg = {
.event = vdc_event,
- .mtu = 64,
+ .mtu = 256,
.mode = LDC_MODE_UNRELIABLE,
};
static void vdc_requeue_inflight(struct vdc_port *port)
{
- struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
- u32 idx;
-
- for (idx = dr->cons; idx != dr->prod; idx = vio_dring_next(dr, idx)) {
- struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
- struct vdc_req_entry *rqe = &port->rq_arr[idx];
- struct request *req;
+ struct vio_driver_state *vio = &port->vio;
+ struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+ struct vio_disk_desc *desc;
+ struct request *req;
+ unsigned int idx;
- ldc_unmap(port->vio.lp, desc->cookies, desc->ncookies);
- desc->hdr.state = VIO_DESC_FREE;
- dr->cons = vio_dring_next(dr, idx);
+ assert_spin_locked(&vio->lock);
- req = rqe->req;
+ for (idx = find_first_bit(dr->txmap, dr->nr_txmap);
+ idx < dr->nr_txmap;
+ idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) {
+ req = vdc_desc_put(port, idx);
if (req == NULL) {
- vdc_end_special(port, desc);
+ desc = vio_dring_entry(dr, idx);
+ vdc_end_special(port, desc->status);
continue;
}
-
- rqe->req = NULL;
blk_requeue_request(port->disk->queue, req);
}
}
* conceivably block when the backend is closed. The serialization should
* ensure that a following handshake initiates only after the reset is done.
*
+ * Out of order execution bypasses vds_io_wait() except for FLUSH. This means
+ * that a request may start later but complete and respond to the client
+ * earlier than other requests.
+ *
* The recommended value for the size of the kernel workqueue is 0;
* it creates threads which scale with ncpu.
*/
int vds_wq;
+int vds_ooo; /* out of order execution default value */
int vds_dbg;
int vds_dbg_ldc;
int vds_dbg_vio;
module_param(vds_dbg_ldc, uint, 0664);
module_param(vds_dbg_vio, uint, 0664);
module_param(vds_wq, uint, 0664);
+module_param(vds_ooo, uint, 0664);
/* Ordered from largest major to lowest */
static struct vio_version vds_versions[] = {
+ { .major = 1, .minor = 3 },
{ .major = 1, .minor = 1 },
{ .major = 1, .minor = 0 },
};
+static inline int vds_version_supp(struct vds_port *port, u16 major, u16 minor)
+{
+ return port->vio.ver.major == major && port->vio.ver.minor >= minor;
+}
+
static void vds_handshake_complete(struct vio_driver_state *vio)
{
struct vio_dring_state *dr;
* Set the maximum expected message length to
* accommodate in-band-descriptor messages with all
* their cookies.
- */
- vio->desc_buf_len = max_inband_msglen;
-
- /*
+ *
* Reallocate before responding to the message since
* the next request in the handshake will use this size
* and a small msgbuf would make the ldc read fail.
if (io->flags & VDS_IO_INIT)
err = vds_be_init(port);
- vds_io_wait(io);
+ vds_io_wait(io); /* handshake is always in order */
if (!err)
err = vio_control_pkt_engine(vio, port->msgbuf);
if (err)
vdsmsg(err, "%s: handshake failed (%d)\n", port->path, err);
+ if (vds_version_supp(port, 1, 3))
+ vds_ooo = 1;
+
vds_io_done(io);
}
if (io->ack == VIO_SUBTYPE_ACK && err != 0 && io->error == 0)
io->error = err > 0 ? err : -err;
- vds_io_wait(io);
+ if (!vds_ooo)
+ vds_io_wait(io);
- if (port->xfer_mode == VIO_DRING_MODE)
+ if (io->flags & VDS_IO_DROP)
+ ;
+ else if (port->xfer_mode == VIO_DRING_MODE)
(void) vds_dring_done(io);
else if (port->xfer_mode == VIO_DESC_MODE)
(void) vds_desc_done(io);
BUG();
/*
- * If there was a reset then the IO request has been
- * converted to a reset request queued to be executed.
+ * Any request, including one that was converted
+ * to a reset ends up here to be completed.
*/
- if (!(io->flags & VDS_IO_FINI))
- vds_io_done(io);
+ vds_io_done(io);
}
static void vds_reset(struct vds_io *io)
vds_vio_lock(vio, flags);
vio_link_state_change(vio, LDC_EVENT_RESET);
- vio->desc_buf_len = 0;
port->flags = 0;
kfree(port->msgbuf);
struct vds_io *io = container_of(work, struct vds_io, vds_work);
struct vio_driver_state *vio = io->vio;
- vds_io_wait(io);
+ if (!vds_ooo)
+ vds_io_wait(io);
vds_reset(io);
- ldc_enable_hv_intr(vio->lp);
vds_io_done(io);
+
+ /*
+ * Enable LDC interrupt after the request completion
+ * so that no new requests are queued while the IO
+ * queue is discarded during reset processing.
+ */
+ ldc_enable_hv_intr(vio->lp);
}
static int vds_dring_io(struct vio_driver_state *vio)
static struct ldc_channel_config vds_ldc_cfg = {
.event = vds_event,
- .mtu = 64,
+ .mtu = 256,
.mode = LDC_MODE_UNRELIABLE,
};