#include <linux/init.h>
#include <linux/list.h>
#include <linux/scatterlist.h>
+#include <linux/wait.h>
#include <asm/vio.h>
#include <asm/ldc.h>
static struct workqueue_struct *sunvdc_wq;
struct vdc_req_entry {
- struct request *req;
+ struct bio *req;
+ u64 size;
+ int sent;
};
struct vdc_port {
struct gendisk *disk;
- struct vdc_completion *cmp;
+ struct vio_completion *cmp;
u64 req_id;
u64 seq;
struct vdc_req_entry rq_arr[VDC_TX_RING_SIZE];
unsigned long ring_cookies;
+ wait_queue_head_t wait;
u64 max_xfer_size;
u32 vdisk_block_size;
u8 vdisk_type;
u8 vdisk_mtype;
+ u8 flags;
+
char disk_name[32];
};
+#define VDC_PORT_RESET 0x1
+
static void vdc_ldc_reset(struct vdc_port *port);
static void vdc_ldc_reset_work(struct work_struct *work);
static void vdc_ldc_reset_timer(unsigned long _arg);
-static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx);
+static struct bio *vdc_desc_put(struct vdc_port *port, unsigned int idx);
static inline void vdc_desc_set_state(struct vio_disk_desc *, int);
static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio)
static int vdc_major;
#define PARTITION_SHIFT 3
-static inline u32 vdc_tx_dring_avail(struct vio_dring_state *dr)
-{
- return VDC_TX_RING_SIZE - bitmap_weight(dr->txmap, dr->nr_txmap);
-}
-
static int vdc_getgeo(struct block_device *bdev, struct hd_geometry *geo)
{
struct gendisk *disk = bdev->bd_disk;
.ioctl = vdc_ioctl,
};
-static void vdc_blk_queue_start(struct vdc_port *port)
-{
- struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
-
- /* restart blk queue when ring is half emptied. also called after
- * handshake completes, so check for initial handshake before we've
- * allocated a disk.
- */
- if (port->disk && blk_queue_stopped(port->disk->queue) &&
- vdc_tx_dring_avail(dr) * 100 / VDC_TX_RING_SIZE >= 50) {
- blk_start_queue(port->disk->queue);
- }
-
-}
-
-static void vdc_finish(struct vio_driver_state *vio, int err, int waiting_for)
+static void vdc_finish(struct vio_completion *cmp, int err, int waiting_for)
{
- if (vio->cmp &&
- (waiting_for == -1 ||
- vio->cmp->waiting_for == waiting_for)) {
- vio->cmp->err = err;
- complete(&vio->cmp->com);
- vio->cmp = NULL;
- }
+ if (cmp && (waiting_for == -1 || cmp->waiting_for == waiting_for)) {
+ cmp->err = err;
+ complete_all(&cmp->com);
+ } else
+ pr_debug(PFX "skip err=%d wait=%d\n", err, waiting_for);
}
static void vdc_handshake_complete(struct vio_driver_state *vio)
struct vdc_port *port = to_vdc_port(vio);
del_timer(&port->ldc_reset_timer);
- vdc_finish(vio, 0, WAITING_FOR_LINK_UP);
- vdc_blk_queue_start(port);
+ vdc_finish(vio->cmp, 0, WAITING_FOR_LINK_UP);
+ vio->cmp = NULL;
}
static int vdc_handle_unknown(struct vdc_port *port, void *arg)
static void vdc_end_special(struct vdc_port *port, int err)
{
- vdc_finish(&port->vio, -err, WAITING_FOR_GEN_CMD);
+ vdc_finish(port->cmp, -err, WAITING_FOR_GEN_CMD);
+ port->cmp = NULL;
}
static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr,
{
struct vio_disk_desc *desc = vio_dring_entry(dr, index);
struct vio_driver_state *vio = &port->vio;
- struct request *req;
+ struct vdc_req_entry *rqe = &port->rq_arr[index];
+ struct bio *req;
assert_spin_locked(&vio->lock);
return;
}
- __blk_end_request(req, (desc->status ? -EIO : 0), desc->size);
+ if (rqe->size != desc->size) {
+ pr_err("%s idx=%u err=%d state=%d size=%lld rsize=%lld\n",
+ __func__, index, err, desc->hdr.state,
+ desc->size, rqe->size);
+ BUG();
+ }
- vdc_blk_queue_start(port);
+ bio_endio(req, err ? -EIO : 0);
+ rqe->size = 0;
}
static int vdc_ack(struct vdc_port *port, void *msgbuf)
if (err < 0)
break;
}
- if (err < 0)
- vdc_finish(&port->vio, err, WAITING_FOR_ANY);
+
+ /*
+ * If there is an error, reset the link. All inflight
+ * requests will be resent once the port is up again.
+ */
+ if (err < 0) {
+ vio_link_state_change(vio, LDC_EVENT_RESET);
+ queue_work(sunvdc_wq, &port->ldc_reset_work);
+ }
out:
spin_unlock_irqrestore(&vio->lock, flags);
}
-static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx)
+static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx, int new)
{
+ struct vdc_req_entry *rqe = &port->rq_arr[idx];
struct vio_driver_state *vio = &port->vio;
struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
struct vio_dring_data hdr = {
.start_idx = idx,
.end_idx = idx,
};
+ unsigned long flags = 0;
int err, delay;
delay = 1;
do {
+ /*
+ * We can get here for a new or an inflight request.
+ * In the former case, we must explicity hold the vio lock.
+ * In the latter case, the lock is already held as part
+ * reset processing. Sending the request and setting the
+ * related fields in case of success must be atomic.
+ * First, snd_nxt and req_id must be incremented atomically.
+ * Second, if the thread is preempted after vio_ldc_send()
+ * but before setting rqe->sent and the service resets,
+ * the request may not be resent during reset processing.
+ */
+ if (new)
+ spin_lock_irqsave(&vio->lock, flags);
+ else
+ assert_spin_locked(&vio->lock);
hdr.seq = dr->snd_nxt;
err = vio_ldc_send(vio, &hdr, sizeof(hdr));
if (err > 0) {
+ rqe->sent = 1;
dr->snd_nxt++;
port->req_id++;
+ if (new)
+ spin_unlock_irqrestore(&vio->lock, flags);
break;
}
+ if (new)
+ spin_unlock_irqrestore(&vio->lock, flags);
udelay(delay);
if ((delay <<= 1) > 128)
}
static struct vio_disk_desc *vdc_desc_get(struct vdc_port *port,
- struct request *req,
+ struct bio *req,
unsigned int *idxp)
{
unsigned int idx;
struct vio_disk_desc *desc = NULL;
struct vio_driver_state *vio = &port->vio;
struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+ DEFINE_WAIT(wait);
+ unsigned long flags;
- assert_spin_locked(&vio->lock);
-
- idx = find_first_zero_bit(dr->txmap, dr->nr_txmap);
- if (idx < VDC_TX_RING_SIZE) {
- bitmap_set(dr->txmap, idx, 1);
- desc = dr->base + (dr->entry_size * idx);
- if (req) {
- BUG_ON(port->rq_arr[idx].req);
- port->rq_arr[idx].req = req;
+ while (1) {
+ prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE);
+
+ spin_lock_irqsave(&vio->lock, flags);
+ idx = find_first_zero_bit(dr->txmap, dr->nr_txmap);
+ if (idx < VDC_TX_RING_SIZE) {
+ bitmap_set(dr->txmap, idx, 1);
+ desc = dr->base + (dr->entry_size * idx);
+ if (req) {
+ BUG_ON(port->rq_arr[idx].req);
+ port->rq_arr[idx].req = req;
+ }
+ *idxp = idx;
+
+ spin_unlock_irqrestore(&vio->lock, flags);
+ finish_wait(&port->wait, &wait);
+ break;
}
- *idxp = idx;
+ spin_unlock_irqrestore(&vio->lock, flags);
+ schedule();
+ finish_wait(&port->wait, &wait);
}
+ BUG_ON(!desc);
+
return desc;
}
-static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx)
+static struct bio *vdc_desc_put(struct vdc_port *port, unsigned int idx)
{
struct vio_driver_state *vio = &port->vio;
struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
struct vdc_req_entry *rqe;
- struct request *req;
+ struct bio *req;
assert_spin_locked(&vio->lock);
rqe = &port->rq_arr[idx];
req = rqe->req;
rqe->req = NULL;
+ rqe->sent = 0;
+ wake_up_all(&port->wait);
return req;
}
}
static int __create_rw_desc(struct vdc_port *port, struct request *req,
- struct vio_disk_desc *desc)
+ struct vio_disk_desc *desc, unsigned int idx)
{
+ struct vio_driver_state *vio = &port->vio;
struct scatterlist sg[port->ring_cookies];
+ struct vdc_req_entry *rqe;
+ DEFINE_WAIT(wait);
unsigned int map_perm;
+ unsigned long flags;
int nsg, err, i;
u64 len;
u8 op;
memset(desc, 0, sizeof(struct vio_disk_desc));
- err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies,
- port->ring_cookies, map_perm);
+ while (1) {
+ prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE);
+
+ spin_lock_irqsave(&vio->lock, flags);
+ err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies,
+ port->ring_cookies, map_perm);
+
+ if (err >= 0 || err != -ENOMEM) {
+ spin_unlock_irqrestore(&vio->lock, flags);
+ finish_wait(&port->wait, &wait);
+ break;
+ }
+
+ spin_unlock_irqrestore(&vio->lock, flags);
+ schedule();
+ finish_wait(&port->wait, &wait);
+ }
if (err <= 0) {
pr_err(PFX "ldc_map_sg() failed, err=%d.\n", err);
desc->size = len;
desc->ncookies = err;
+ rqe = &port->rq_arr[idx];
+ rqe->size = len;
+
return 0;
}
vdc_desc_set_state(desc, VIO_DESC_READY);
while (1) {
- err = __vdc_tx_trigger(port, idx);
+ err = __vdc_tx_trigger(port, idx, 1);
if (err == -ECONNRESET || err == -ENOTCONN) {
vdc_ldc_reset(port);
return err;
}
-static void do_vdc_request(struct request_queue *rq)
+/*
+ * IO requests are handed off directly to vdc without any queueing at
+ * the blk layer.
+ *
+ * A dring entry is first allocated and then mapped for a request,
+ * and the request is then sent to the server. A recoverable error,
+ * e.g. reset, is continuously retried until success.
+ *
+ * A reset error can be sync or async, i.e., it can be a direct result
+ * of a send, or fielded indirectly via an interrupt. In either case
+ * the connection is reset and all inflight request are resent immediately
+ * after the connection is re-established at the end of the reset processing.
+ *
+ * The tx dring acts effectively as a request queue. An incoming request
+ * will continuously retry until it gets a tx ring descriptor and proceeds.
+ * The descriptor is freed in interrupt context and the vio spinlock is used
+ * for synchronization.
+ */
+static void vdc_make_request(struct request_queue *q, struct bio *bio)
{
- struct vdc_port *port = NULL;
- struct vio_dring_state *dr = NULL;
- struct request *req;
- unsigned int idx = 0;
+ struct vdc_port *port = q->queuedata;
+ struct vio_driver_state *vio = &port->vio;
+ struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+ struct vio_disk_desc *desc;
+ struct request req;
+ unsigned int idx;
+ unsigned long flags;
int err = 0;
- while ((req = blk_peek_request(rq)) != NULL) {
- struct vio_disk_desc *desc;
-
- port = req->rq_disk->private_data;
- dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+ BUG_ON(!port);
- if (unlikely(vdc_tx_dring_avail(dr) < 1))
- goto wait;
+ desc = vdc_desc_get(port, bio, &idx);
- desc = vdc_desc_get(port, req, &idx);
-
- /* Note that REQ_FLUSH requests with a payload are
- * automatically turned into a sequence of an empty
- * REQ_FLUSH request followed by the actual i/o by
- * the block layer.
+ if (bio->bi_rw & REQ_FLUSH)
+ __create_flush_desc(port, desc);
+ else {
+ memset(&req, 0, sizeof(req));
+ /*
+ * XXX We should be able to use init_request_from_bio()
+ * but it is not an exported symbol.
*/
- if (req->cmd_flags & REQ_FLUSH) {
- if (req->bio != NULL)
- pr_err(PFX "non-empty REQ_FLUSH, lost I/O\n");
- __create_flush_desc(port, desc);
- } else
- err = __create_rw_desc(port, req, desc);
-
+ req.bio = bio;
+ req.cmd_flags = bio->bi_rw & REQ_COMMON_MASK;
+ req.errors = 0;
+ req.__sector = bio->bi_iter.bi_sector;
+ req.q = q;
+ err = __create_rw_desc(port, &req, desc, idx);
if (err)
- goto wait;
-
- blk_start_request(req);
+ pr_err(PFX "__create_rw_desc() failed, err=%d\n", err);
+ }
+ if (!err)
err = __send_request(port, idx);
- if (err < 0) {
- blk_requeue_request(rq, req);
-wait:
- /* Avoid pointless unplugs. */
- blk_stop_queue(rq);
- break;
- }
- }
- if (err < 0 &&
- err != -ENOMEM && err != -ECONNRESET && err != -ENOTCONN)
+ /*
+ * Terminate the request unless a descriptor was
+ * successfully allocated and sent to the server.
+ */
+ if (err < 0) {
+ spin_lock_irqsave(&vio->lock, flags);
vdc_end_one(port, dr, idx, err);
+ spin_unlock_irqrestore(&vio->lock, flags);
+ }
}
static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
{
struct vio_completion comp;
struct vio_disk_desc *desc;
- unsigned long flags;
unsigned int map_perm;
unsigned int idx;
int op_len, err;
case VD_OP_GET_EFI:
case VD_OP_SET_EFI:
return -EOPNOTSUPP;
- break;
};
map_perm |= LDC_MAP_SHADOW | LDC_MAP_DIRECT | LDC_MAP_IO;
if (map_perm & LDC_MAP_R)
memcpy(req_buf, buf, len);
- spin_lock_irqsave(&port->vio.lock, flags);
-
desc = vdc_desc_get(port, NULL, &idx);
if (!desc) {
err = -ENOMEM;
init_completion(&comp.com);
comp.waiting_for = WAITING_FOR_GEN_CMD;
- port->vio.cmp = ∁
+ port->cmp = ∁
desc->hdr.ack = VIO_ACK_ENABLE;
desc->req_id = port->req_id;
err = __send_request(port, idx);
if (err >= 0) {
- spin_unlock_irqrestore(&port->vio.lock, flags);
wait_for_completion(&comp.com);
err = comp.err;
- spin_lock_irqsave(&port->vio.lock, flags);
} else {
- port->vio.cmp = NULL;
+ port->cmp = NULL;
goto done;
}
done:
(void) vdc_desc_put(port, idx);
- spin_unlock_irqrestore(&port->vio.lock, flags);
kfree(req_buf);
-
return err;
}
(u64)geom.num_sec);
}
- q = blk_init_queue(do_vdc_request, &port->vio.lock);
+ q = blk_alloc_queue(GFP_KERNEL);
if (!q) {
printk(KERN_ERR PFX "%s: Could not allocate queue.\n",
port->vio.name);
return -ENOMEM;
}
+ blk_queue_make_request(q, vdc_make_request);
+ q->queuedata = port;
+
g = alloc_disk(1 << PARTITION_SHIFT);
if (!g) {
printk(KERN_ERR PFX "%s: Could not allocate gendisk.\n",
blk_queue_segment_boundary(q, PAGE_SIZE - 1);
blk_queue_max_segment_size(q, PAGE_SIZE);
- /* vds may be a device with volatile caching and in protocol 1.3
- * can perform out of order completion. REQ_FLUSH/REQ_FUA are
- * used to signal completion barriers.
- * REQ_FUA is turned into a following REQ_FLUSH by block layer
- * if not supported directly.
- */
- blk_queue_flush(q, REQ_FLUSH);
-
blk_queue_max_segments(q, port->ring_cookies);
blk_queue_max_hw_sectors(q, port->max_xfer_size);
g->major = vdc_major;
static struct ldc_channel_config vdc_ldc_cfg = {
.event = vdc_event,
- .mtu = 256,
+ .mtu = 512,
.mode = LDC_MODE_UNRELIABLE,
};
setup_timer(&port->ldc_reset_timer, vdc_ldc_reset_timer,
(unsigned long)port);
INIT_WORK(&port->ldc_reset_work, vdc_ldc_reset_work);
+ init_waitqueue_head(&port->wait);
err = vio_driver_init(&port->vio, vdev, VDEV_DISK,
vdc_versions, ARRAY_SIZE(vdc_versions),
unsigned long flags;
spin_lock_irqsave(&port->vio.lock, flags);
- blk_stop_queue(port->disk->queue);
spin_unlock_irqrestore(&port->vio.lock, flags);
flush_work(&port->ldc_reset_work);
del_timer_sync(&port->vio.timer);
del_gendisk(port->disk);
- blk_cleanup_queue(port->disk->queue);
put_disk(port->disk);
port->disk = NULL;
return 0;
}
-static void vdc_requeue_inflight(struct vdc_port *port)
+static void vdc_resend_inflight(struct vdc_port *port)
{
struct vio_driver_state *vio = &port->vio;
struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
struct vio_disk_desc *desc;
- struct request *req;
+ struct vdc_req_entry *rqe;
unsigned int idx;
assert_spin_locked(&vio->lock);
for (idx = find_first_bit(dr->txmap, dr->nr_txmap);
idx < dr->nr_txmap;
idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) {
- req = vdc_desc_put(port, idx);
- if (req == NULL) {
+ rqe = &port->rq_arr[idx];
+ if (rqe->sent) {
desc = vio_dring_entry(dr, idx);
- vdc_end_special(port, desc->status);
- continue;
+ vdc_desc_set_state(desc, VIO_DESC_READY);
+ if (__vdc_tx_trigger(port, idx, 0) < 0)
+ break;
}
- blk_requeue_request(port->disk->queue, req);
}
}
static void vdc_queue_drain(struct vdc_port *port)
{
- struct request *req;
+ struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+ unsigned int idx;
- while ((req = blk_fetch_request(port->disk->queue)) != NULL)
- __blk_end_request_all(req, -EIO);
+ for (idx = find_first_bit(dr->txmap, dr->nr_txmap);
+ idx < dr->nr_txmap;
+ idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) {
+ vdc_end_one(port, dr, idx, -EIO);
+ }
}
static void vdc_ldc_reset_timer(unsigned long _arg)
pr_warn(PFX "%s ldc down %llu seconds, draining queue\n",
port->disk_name, port->ldc_timeout);
vdc_queue_drain(port);
- vdc_blk_queue_start(port);
}
spin_unlock_irqrestore(&vio->lock, flags);
}
static void vdc_ldc_reset_work(struct work_struct *work)
{
struct vdc_port *port;
- struct vio_driver_state *vio;
- unsigned long flags;
port = container_of(work, struct vdc_port, ldc_reset_work);
- vio = &port->vio;
- spin_lock_irqsave(&vio->lock, flags);
vdc_ldc_reset(port);
- spin_unlock_irqrestore(&vio->lock, flags);
}
/*
*/
static void vdc_ldc_reset(struct vdc_port *port)
{
- int err;
struct vio_driver_state *vio = &port->vio;
-
- assert_spin_locked(&vio->lock);
+ unsigned long flags;
+ int err;
pr_warn(PFX "%s ldc link reset\n", port->disk_name);
- if (!port->disk)
+ spin_lock_irqsave(&vio->lock, flags);
+
+ if (port->flags & VDC_PORT_RESET) {
+ spin_unlock_irqrestore(&vio->lock, flags);
+ wait_for_completion(&port->vio.cmp->com);
return;
+ }
- blk_stop_queue(port->disk->queue);
- vdc_requeue_inflight(port);
- vio_link_state_change(vio, LDC_EVENT_RESET);
+ if (!port->disk)
+ goto done;
- err = ldc_connect(vio->lp);
+ vio_link_state_change(vio, LDC_EVENT_RESET);
+ port->flags |= VDC_PORT_RESET;
+ spin_unlock_irqrestore(&vio->lock, flags);
+ err = vdc_port_up(port);
+ spin_lock_irqsave(&vio->lock, flags);
if (err)
- pr_err(PFX "%s connect failed, err=%d\n",
- port->disk_name, err);
+ pr_err(PFX "%s vdc_port_up() failed, err=%d\n",
+ port->disk_name, err);
+ else
+ vdc_resend_inflight(port);
if (port->ldc_timeout)
mod_timer(&port->ldc_reset_timer,
round_jiffies(jiffies + HZ * port->ldc_timeout));
mod_timer(&vio->timer, round_jiffies(jiffies + HZ));
+
+ port->flags &= ~VDC_PORT_RESET;
+done:
+ spin_unlock_irqrestore(&vio->lock, flags);
}
static const struct vio_device_id vdc_port_match[] = {