From: Bijan Mottahedeh Date: Mon, 10 Oct 2016 07:10:59 +0000 (-0400) Subject: sparc64: Use block layer BIO-based interface for VDC IO requests X-Git-Tag: v4.1.12-92~30^2~9 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=4b725eb64cc10a4877f2af75ff3a776586f68eb7;p=users%2Fjedix%2Flinux-maple.git sparc64: Use block layer BIO-based interface for VDC IO requests Orabug: 24823012 Signed-off-by: Bijan Mottahedeh Reviewed-by: Alexandre Chartre Signed-off-by: Allen Pais --- diff --git a/drivers/block/sunvdc.c b/drivers/block/sunvdc.c index b3f342165af84..d61b7ea943719 100644 --- a/drivers/block/sunvdc.c +++ b/drivers/block/sunvdc.c @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -44,7 +45,9 @@ MODULE_VERSION(DRV_MODULE_VERSION); static struct workqueue_struct *sunvdc_wq; struct vdc_req_entry { - struct request *req; + struct bio *req; + u64 size; + int sent; }; struct vdc_port { @@ -52,13 +55,14 @@ struct vdc_port { struct gendisk *disk; - struct vdc_completion *cmp; + struct vio_completion *cmp; u64 req_id; u64 seq; struct vdc_req_entry rq_arr[VDC_TX_RING_SIZE]; unsigned long ring_cookies; + wait_queue_head_t wait; u64 max_xfer_size; u32 vdisk_block_size; @@ -75,13 +79,17 @@ struct vdc_port { u8 vdisk_type; u8 vdisk_mtype; + u8 flags; + char disk_name[32]; }; +#define VDC_PORT_RESET 0x1 + static void vdc_ldc_reset(struct vdc_port *port); static void vdc_ldc_reset_work(struct work_struct *work); static void vdc_ldc_reset_timer(unsigned long _arg); -static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx); +static struct bio *vdc_desc_put(struct vdc_port *port, unsigned int idx); static inline void vdc_desc_set_state(struct vio_disk_desc *, int); static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio) @@ -107,11 +115,6 @@ static inline int vdc_version_supported(struct vdc_port *port, static int vdc_major; #define PARTITION_SHIFT 3 -static inline u32 vdc_tx_dring_avail(struct vio_dring_state *dr) -{ - return VDC_TX_RING_SIZE - bitmap_weight(dr->txmap, dr->nr_txmap); -} - static int vdc_getgeo(struct block_device *bdev, struct hd_geometry *geo) { struct gendisk *disk = bdev->bd_disk; @@ -165,30 +168,13 @@ static const struct block_device_operations vdc_fops = { .ioctl = vdc_ioctl, }; -static void vdc_blk_queue_start(struct vdc_port *port) -{ - struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING]; - - /* restart blk queue when ring is half emptied. also called after - * handshake completes, so check for initial handshake before we've - * allocated a disk. - */ - if (port->disk && blk_queue_stopped(port->disk->queue) && - vdc_tx_dring_avail(dr) * 100 / VDC_TX_RING_SIZE >= 50) { - blk_start_queue(port->disk->queue); - } - -} - -static void vdc_finish(struct vio_driver_state *vio, int err, int waiting_for) +static void vdc_finish(struct vio_completion *cmp, int err, int waiting_for) { - if (vio->cmp && - (waiting_for == -1 || - vio->cmp->waiting_for == waiting_for)) { - vio->cmp->err = err; - complete(&vio->cmp->com); - vio->cmp = NULL; - } + if (cmp && (waiting_for == -1 || cmp->waiting_for == waiting_for)) { + cmp->err = err; + complete_all(&cmp->com); + } else + pr_debug(PFX "skip err=%d wait=%d\n", err, waiting_for); } static void vdc_handshake_complete(struct vio_driver_state *vio) @@ -196,8 +182,8 @@ static void vdc_handshake_complete(struct vio_driver_state *vio) struct vdc_port *port = to_vdc_port(vio); del_timer(&port->ldc_reset_timer); - vdc_finish(vio, 0, WAITING_FOR_LINK_UP); - vdc_blk_queue_start(port); + vdc_finish(vio->cmp, 0, WAITING_FOR_LINK_UP); + vio->cmp = NULL; } static int vdc_handle_unknown(struct vdc_port *port, void *arg) @@ -286,7 +272,8 @@ static int vdc_handle_attr(struct vio_driver_state *vio, void *arg) static void vdc_end_special(struct vdc_port *port, int err) { - vdc_finish(&port->vio, -err, WAITING_FOR_GEN_CMD); + vdc_finish(port->cmp, -err, WAITING_FOR_GEN_CMD); + port->cmp = NULL; } static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr, @@ -294,7 +281,8 @@ static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr, { struct vio_disk_desc *desc = vio_dring_entry(dr, index); struct vio_driver_state *vio = &port->vio; - struct request *req; + struct vdc_req_entry *rqe = &port->rq_arr[index]; + struct bio *req; assert_spin_locked(&vio->lock); @@ -313,9 +301,15 @@ static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr, return; } - __blk_end_request(req, (desc->status ? -EIO : 0), desc->size); + if (rqe->size != desc->size) { + pr_err("%s idx=%u err=%d state=%d size=%lld rsize=%lld\n", + __func__, index, err, desc->hdr.state, + desc->size, rqe->size); + BUG(); + } - vdc_blk_queue_start(port); + bio_endio(req, err ? -EIO : 0); + rqe->size = 0; } static int vdc_ack(struct vdc_port *port, void *msgbuf) @@ -403,14 +397,22 @@ static void vdc_event(void *arg, int event) if (err < 0) break; } - if (err < 0) - vdc_finish(&port->vio, err, WAITING_FOR_ANY); + + /* + * If there is an error, reset the link. All inflight + * requests will be resent once the port is up again. + */ + if (err < 0) { + vio_link_state_change(vio, LDC_EVENT_RESET); + queue_work(sunvdc_wq, &port->ldc_reset_work); + } out: spin_unlock_irqrestore(&vio->lock, flags); } -static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx) +static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx, int new) { + struct vdc_req_entry *rqe = &port->rq_arr[idx]; struct vio_driver_state *vio = &port->vio; struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; struct vio_dring_data hdr = { @@ -424,17 +426,38 @@ static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx) .start_idx = idx, .end_idx = idx, }; + unsigned long flags = 0; int err, delay; delay = 1; do { + /* + * We can get here for a new or an inflight request. + * In the former case, we must explicity hold the vio lock. + * In the latter case, the lock is already held as part + * reset processing. Sending the request and setting the + * related fields in case of success must be atomic. + * First, snd_nxt and req_id must be incremented atomically. + * Second, if the thread is preempted after vio_ldc_send() + * but before setting rqe->sent and the service resets, + * the request may not be resent during reset processing. + */ + if (new) + spin_lock_irqsave(&vio->lock, flags); + else + assert_spin_locked(&vio->lock); hdr.seq = dr->snd_nxt; err = vio_ldc_send(vio, &hdr, sizeof(hdr)); if (err > 0) { + rqe->sent = 1; dr->snd_nxt++; port->req_id++; + if (new) + spin_unlock_irqrestore(&vio->lock, flags); break; } + if (new) + spin_unlock_irqrestore(&vio->lock, flags); udelay(delay); if ((delay <<= 1) > 128) @@ -448,37 +471,51 @@ static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx) } static struct vio_disk_desc *vdc_desc_get(struct vdc_port *port, - struct request *req, + struct bio *req, unsigned int *idxp) { unsigned int idx; struct vio_disk_desc *desc = NULL; struct vio_driver_state *vio = &port->vio; struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; + DEFINE_WAIT(wait); + unsigned long flags; - assert_spin_locked(&vio->lock); - - idx = find_first_zero_bit(dr->txmap, dr->nr_txmap); - if (idx < VDC_TX_RING_SIZE) { - bitmap_set(dr->txmap, idx, 1); - desc = dr->base + (dr->entry_size * idx); - if (req) { - BUG_ON(port->rq_arr[idx].req); - port->rq_arr[idx].req = req; + while (1) { + prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE); + + spin_lock_irqsave(&vio->lock, flags); + idx = find_first_zero_bit(dr->txmap, dr->nr_txmap); + if (idx < VDC_TX_RING_SIZE) { + bitmap_set(dr->txmap, idx, 1); + desc = dr->base + (dr->entry_size * idx); + if (req) { + BUG_ON(port->rq_arr[idx].req); + port->rq_arr[idx].req = req; + } + *idxp = idx; + + spin_unlock_irqrestore(&vio->lock, flags); + finish_wait(&port->wait, &wait); + break; } - *idxp = idx; + spin_unlock_irqrestore(&vio->lock, flags); + schedule(); + finish_wait(&port->wait, &wait); } + BUG_ON(!desc); + return desc; } -static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx) +static struct bio *vdc_desc_put(struct vdc_port *port, unsigned int idx) { struct vio_driver_state *vio = &port->vio; struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; struct vio_disk_desc *desc = vio_dring_entry(dr, idx); struct vdc_req_entry *rqe; - struct request *req; + struct bio *req; assert_spin_locked(&vio->lock); @@ -490,6 +527,8 @@ static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx) rqe = &port->rq_arr[idx]; req = rqe->req; rqe->req = NULL; + rqe->sent = 0; + wake_up_all(&port->wait); return req; } @@ -514,10 +553,14 @@ static void __create_flush_desc(struct vdc_port *port, } static int __create_rw_desc(struct vdc_port *port, struct request *req, - struct vio_disk_desc *desc) + struct vio_disk_desc *desc, unsigned int idx) { + struct vio_driver_state *vio = &port->vio; struct scatterlist sg[port->ring_cookies]; + struct vdc_req_entry *rqe; + DEFINE_WAIT(wait); unsigned int map_perm; + unsigned long flags; int nsg, err, i; u64 len; u8 op; @@ -541,8 +584,23 @@ static int __create_rw_desc(struct vdc_port *port, struct request *req, memset(desc, 0, sizeof(struct vio_disk_desc)); - err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies, - port->ring_cookies, map_perm); + while (1) { + prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE); + + spin_lock_irqsave(&vio->lock, flags); + err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies, + port->ring_cookies, map_perm); + + if (err >= 0 || err != -ENOMEM) { + spin_unlock_irqrestore(&vio->lock, flags); + finish_wait(&port->wait, &wait); + break; + } + + spin_unlock_irqrestore(&vio->lock, flags); + schedule(); + finish_wait(&port->wait, &wait); + } if (err <= 0) { pr_err(PFX "ldc_map_sg() failed, err=%d.\n", err); @@ -565,6 +623,9 @@ static int __create_rw_desc(struct vdc_port *port, struct request *req, desc->size = len; desc->ncookies = err; + rqe = &port->rq_arr[idx]; + rqe->size = len; + return 0; } @@ -578,7 +639,7 @@ static int __send_request(struct vdc_port *port, unsigned int idx) vdc_desc_set_state(desc, VIO_DESC_READY); while (1) { - err = __vdc_tx_trigger(port, idx); + err = __vdc_tx_trigger(port, idx, 1); if (err == -ECONNRESET || err == -ENOTCONN) { vdc_ldc_reset(port); @@ -594,62 +655,75 @@ static int __send_request(struct vdc_port *port, unsigned int idx) return err; } -static void do_vdc_request(struct request_queue *rq) +/* + * IO requests are handed off directly to vdc without any queueing at + * the blk layer. + * + * A dring entry is first allocated and then mapped for a request, + * and the request is then sent to the server. A recoverable error, + * e.g. reset, is continuously retried until success. + * + * A reset error can be sync or async, i.e., it can be a direct result + * of a send, or fielded indirectly via an interrupt. In either case + * the connection is reset and all inflight request are resent immediately + * after the connection is re-established at the end of the reset processing. + * + * The tx dring acts effectively as a request queue. An incoming request + * will continuously retry until it gets a tx ring descriptor and proceeds. + * The descriptor is freed in interrupt context and the vio spinlock is used + * for synchronization. + */ +static void vdc_make_request(struct request_queue *q, struct bio *bio) { - struct vdc_port *port = NULL; - struct vio_dring_state *dr = NULL; - struct request *req; - unsigned int idx = 0; + struct vdc_port *port = q->queuedata; + struct vio_driver_state *vio = &port->vio; + struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; + struct vio_disk_desc *desc; + struct request req; + unsigned int idx; + unsigned long flags; int err = 0; - while ((req = blk_peek_request(rq)) != NULL) { - struct vio_disk_desc *desc; - - port = req->rq_disk->private_data; - dr = &port->vio.drings[VIO_DRIVER_TX_RING]; + BUG_ON(!port); - if (unlikely(vdc_tx_dring_avail(dr) < 1)) - goto wait; + desc = vdc_desc_get(port, bio, &idx); - desc = vdc_desc_get(port, req, &idx); - - /* Note that REQ_FLUSH requests with a payload are - * automatically turned into a sequence of an empty - * REQ_FLUSH request followed by the actual i/o by - * the block layer. + if (bio->bi_rw & REQ_FLUSH) + __create_flush_desc(port, desc); + else { + memset(&req, 0, sizeof(req)); + /* + * XXX We should be able to use init_request_from_bio() + * but it is not an exported symbol. */ - if (req->cmd_flags & REQ_FLUSH) { - if (req->bio != NULL) - pr_err(PFX "non-empty REQ_FLUSH, lost I/O\n"); - __create_flush_desc(port, desc); - } else - err = __create_rw_desc(port, req, desc); - + req.bio = bio; + req.cmd_flags = bio->bi_rw & REQ_COMMON_MASK; + req.errors = 0; + req.__sector = bio->bi_iter.bi_sector; + req.q = q; + err = __create_rw_desc(port, &req, desc, idx); if (err) - goto wait; - - blk_start_request(req); + pr_err(PFX "__create_rw_desc() failed, err=%d\n", err); + } + if (!err) err = __send_request(port, idx); - if (err < 0) { - blk_requeue_request(rq, req); -wait: - /* Avoid pointless unplugs. */ - blk_stop_queue(rq); - break; - } - } - if (err < 0 && - err != -ENOMEM && err != -ECONNRESET && err != -ENOTCONN) + /* + * Terminate the request unless a descriptor was + * successfully allocated and sent to the server. + */ + if (err < 0) { + spin_lock_irqsave(&vio->lock, flags); vdc_end_one(port, dr, idx, err); + spin_unlock_irqrestore(&vio->lock, flags); + } } static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) { struct vio_completion comp; struct vio_disk_desc *desc; - unsigned long flags; unsigned int map_perm; unsigned int idx; int op_len, err; @@ -712,7 +786,6 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) case VD_OP_GET_EFI: case VD_OP_SET_EFI: return -EOPNOTSUPP; - break; }; map_perm |= LDC_MAP_SHADOW | LDC_MAP_DIRECT | LDC_MAP_IO; @@ -728,8 +801,6 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) if (map_perm & LDC_MAP_R) memcpy(req_buf, buf, len); - spin_lock_irqsave(&port->vio.lock, flags); - desc = vdc_desc_get(port, NULL, &idx); if (!desc) { err = -ENOMEM; @@ -744,7 +815,7 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) init_completion(&comp.com); comp.waiting_for = WAITING_FOR_GEN_CMD; - port->vio.cmp = ∁ + port->cmp = ∁ desc->hdr.ack = VIO_ACK_ENABLE; desc->req_id = port->req_id; @@ -757,12 +828,10 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) err = __send_request(port, idx); if (err >= 0) { - spin_unlock_irqrestore(&port->vio.lock, flags); wait_for_completion(&comp.com); err = comp.err; - spin_lock_irqsave(&port->vio.lock, flags); } else { - port->vio.cmp = NULL; + port->cmp = NULL; goto done; } @@ -771,9 +840,7 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) done: (void) vdc_desc_put(port, idx); - spin_unlock_irqrestore(&port->vio.lock, flags); kfree(req_buf); - return err; } @@ -886,12 +953,15 @@ static int probe_disk(struct vdc_port *port) (u64)geom.num_sec); } - q = blk_init_queue(do_vdc_request, &port->vio.lock); + q = blk_alloc_queue(GFP_KERNEL); if (!q) { printk(KERN_ERR PFX "%s: Could not allocate queue.\n", port->vio.name); return -ENOMEM; } + blk_queue_make_request(q, vdc_make_request); + q->queuedata = port; + g = alloc_disk(1 << PARTITION_SHIFT); if (!g) { printk(KERN_ERR PFX "%s: Could not allocate gendisk.\n", @@ -906,14 +976,6 @@ static int probe_disk(struct vdc_port *port) blk_queue_segment_boundary(q, PAGE_SIZE - 1); blk_queue_max_segment_size(q, PAGE_SIZE); - /* vds may be a device with volatile caching and in protocol 1.3 - * can perform out of order completion. REQ_FLUSH/REQ_FUA are - * used to signal completion barriers. - * REQ_FUA is turned into a following REQ_FLUSH by block layer - * if not supported directly. - */ - blk_queue_flush(q, REQ_FLUSH); - blk_queue_max_segments(q, port->ring_cookies); blk_queue_max_hw_sectors(q, port->max_xfer_size); g->major = vdc_major; @@ -961,7 +1023,7 @@ static int probe_disk(struct vdc_port *port) static struct ldc_channel_config vdc_ldc_cfg = { .event = vdc_event, - .mtu = 256, + .mtu = 512, .mode = LDC_MODE_UNRELIABLE, }; @@ -1031,6 +1093,7 @@ static int vdc_port_probe(struct vio_dev *vdev, const struct vio_device_id *id) setup_timer(&port->ldc_reset_timer, vdc_ldc_reset_timer, (unsigned long)port); INIT_WORK(&port->ldc_reset_work, vdc_ldc_reset_work); + init_waitqueue_head(&port->wait); err = vio_driver_init(&port->vio, vdev, VDEV_DISK, vdc_versions, ARRAY_SIZE(vdc_versions), @@ -1085,7 +1148,6 @@ static int vdc_port_remove(struct vio_dev *vdev) unsigned long flags; spin_lock_irqsave(&port->vio.lock, flags); - blk_stop_queue(port->disk->queue); spin_unlock_irqrestore(&port->vio.lock, flags); flush_work(&port->ldc_reset_work); @@ -1093,7 +1155,6 @@ static int vdc_port_remove(struct vio_dev *vdev) del_timer_sync(&port->vio.timer); del_gendisk(port->disk); - blk_cleanup_queue(port->disk->queue); put_disk(port->disk); port->disk = NULL; @@ -1107,12 +1168,12 @@ static int vdc_port_remove(struct vio_dev *vdev) return 0; } -static void vdc_requeue_inflight(struct vdc_port *port) +static void vdc_resend_inflight(struct vdc_port *port) { struct vio_driver_state *vio = &port->vio; struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; struct vio_disk_desc *desc; - struct request *req; + struct vdc_req_entry *rqe; unsigned int idx; assert_spin_locked(&vio->lock); @@ -1120,22 +1181,26 @@ static void vdc_requeue_inflight(struct vdc_port *port) for (idx = find_first_bit(dr->txmap, dr->nr_txmap); idx < dr->nr_txmap; idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) { - req = vdc_desc_put(port, idx); - if (req == NULL) { + rqe = &port->rq_arr[idx]; + if (rqe->sent) { desc = vio_dring_entry(dr, idx); - vdc_end_special(port, desc->status); - continue; + vdc_desc_set_state(desc, VIO_DESC_READY); + if (__vdc_tx_trigger(port, idx, 0) < 0) + break; } - blk_requeue_request(port->disk->queue, req); } } static void vdc_queue_drain(struct vdc_port *port) { - struct request *req; + struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING]; + unsigned int idx; - while ((req = blk_fetch_request(port->disk->queue)) != NULL) - __blk_end_request_all(req, -EIO); + for (idx = find_first_bit(dr->txmap, dr->nr_txmap); + idx < dr->nr_txmap; + idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) { + vdc_end_one(port, dr, idx, -EIO); + } } static void vdc_ldc_reset_timer(unsigned long _arg) @@ -1152,7 +1217,6 @@ static void vdc_ldc_reset_timer(unsigned long _arg) pr_warn(PFX "%s ldc down %llu seconds, draining queue\n", port->disk_name, port->ldc_timeout); vdc_queue_drain(port); - vdc_blk_queue_start(port); } spin_unlock_irqrestore(&vio->lock, flags); } @@ -1160,15 +1224,10 @@ static void vdc_ldc_reset_timer(unsigned long _arg) static void vdc_ldc_reset_work(struct work_struct *work) { struct vdc_port *port; - struct vio_driver_state *vio; - unsigned long flags; port = container_of(work, struct vdc_port, ldc_reset_work); - vio = &port->vio; - spin_lock_irqsave(&vio->lock, flags); vdc_ldc_reset(port); - spin_unlock_irqrestore(&vio->lock, flags); } /* @@ -1179,29 +1238,42 @@ static void vdc_ldc_reset_work(struct work_struct *work) */ static void vdc_ldc_reset(struct vdc_port *port) { - int err; struct vio_driver_state *vio = &port->vio; - - assert_spin_locked(&vio->lock); + unsigned long flags; + int err; pr_warn(PFX "%s ldc link reset\n", port->disk_name); - if (!port->disk) + spin_lock_irqsave(&vio->lock, flags); + + if (port->flags & VDC_PORT_RESET) { + spin_unlock_irqrestore(&vio->lock, flags); + wait_for_completion(&port->vio.cmp->com); return; + } - blk_stop_queue(port->disk->queue); - vdc_requeue_inflight(port); - vio_link_state_change(vio, LDC_EVENT_RESET); + if (!port->disk) + goto done; - err = ldc_connect(vio->lp); + vio_link_state_change(vio, LDC_EVENT_RESET); + port->flags |= VDC_PORT_RESET; + spin_unlock_irqrestore(&vio->lock, flags); + err = vdc_port_up(port); + spin_lock_irqsave(&vio->lock, flags); if (err) - pr_err(PFX "%s connect failed, err=%d\n", - port->disk_name, err); + pr_err(PFX "%s vdc_port_up() failed, err=%d\n", + port->disk_name, err); + else + vdc_resend_inflight(port); if (port->ldc_timeout) mod_timer(&port->ldc_reset_timer, round_jiffies(jiffies + HZ * port->ldc_timeout)); mod_timer(&vio->timer, round_jiffies(jiffies + HZ)); + + port->flags &= ~VDC_PORT_RESET; +done: + spin_unlock_irqrestore(&vio->lock, flags); } static const struct vio_device_id vdc_port_match[] = {