From 68f7c9c17fb80d29cbc1e5110f6c021f8da8d610 Mon Sep 17 00:00:00 2001 From: Bijan Mottahedeh Date: Sun, 12 Jun 2016 08:39:52 -0700 Subject: [PATCH] sparc64: Enable virtual disk protocol out of order execution Orabug: 24815498 Signed-off-by: Bijan Mottahedeh Reviewed-by: Alexandre Chartre Reviewed-by: Chris Hyser Signed-off-by: Allen Pais --- arch/sparc/include/asm/vio.h | 2 + drivers/block/sunvdc.c | 327 ++++++++++++++++++++++++----------- drivers/block/vds/vds.h | 9 +- drivers/block/vds/vds_io.c | 16 +- drivers/block/vds/vds_io.h | 1 + drivers/block/vds/vds_main.c | 50 ++++-- 6 files changed, 283 insertions(+), 122 deletions(-) diff --git a/arch/sparc/include/asm/vio.h b/arch/sparc/include/asm/vio.h index e990d29b64a1..bfd4cc86aef5 100644 --- a/arch/sparc/include/asm/vio.h +++ b/arch/sparc/include/asm/vio.h @@ -310,6 +310,8 @@ struct vio_dring_state { void *base; u64 snd_nxt; u64 rcv_nxt; + unsigned long *txmap; + unsigned long nr_txmap; u32 entry_size; u32 num_entries; u32 prod; diff --git a/drivers/block/sunvdc.c b/drivers/block/sunvdc.c index 70b34b1409db..b3f342165af8 100644 --- a/drivers/block/sunvdc.c +++ b/drivers/block/sunvdc.c @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -23,8 +24,8 @@ #define DRV_MODULE_NAME "sunvdc" #define PFX DRV_MODULE_NAME ": " -#define DRV_MODULE_VERSION "1.2" -#define DRV_MODULE_RELDATE "November 24, 2014" +#define DRV_MODULE_VERSION "1.3" +#define DRV_MODULE_RELDATE "September 24, 2016" static char version[] = DRV_MODULE_NAME ".c:v" DRV_MODULE_VERSION " (" DRV_MODULE_RELDATE ")\n"; @@ -80,6 +81,8 @@ struct vdc_port { static void vdc_ldc_reset(struct vdc_port *port); static void vdc_ldc_reset_work(struct work_struct *work); static void vdc_ldc_reset_timer(unsigned long _arg); +static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx); +static inline void vdc_desc_set_state(struct vio_disk_desc *, int); static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio) { @@ -88,6 +91,8 @@ static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio) /* Ordered from largest major to lowest */ static struct vio_version vdc_versions[] = { + { .major = 1, .minor = 3 }, + { .major = 1, .minor = 2 }, { .major = 1, .minor = 1 }, { .major = 1, .minor = 0 }, }; @@ -104,7 +109,7 @@ static int vdc_major; static inline u32 vdc_tx_dring_avail(struct vio_dring_state *dr) { - return vio_dring_avail(dr, VDC_TX_RING_SIZE); + return VDC_TX_RING_SIZE - bitmap_weight(dr->txmap, dr->nr_txmap); } static int vdc_getgeo(struct block_device *bdev, struct hd_geometry *geo) @@ -279,35 +284,35 @@ static int vdc_handle_attr(struct vio_driver_state *vio, void *arg) } } -static void vdc_end_special(struct vdc_port *port, struct vio_disk_desc *desc) +static void vdc_end_special(struct vdc_port *port, int err) { - int err = desc->status; - vdc_finish(&port->vio, -err, WAITING_FOR_GEN_CMD); } static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr, - unsigned int index) + unsigned int index, int err) { struct vio_disk_desc *desc = vio_dring_entry(dr, index); - struct vdc_req_entry *rqe = &port->rq_arr[index]; + struct vio_driver_state *vio = &port->vio; struct request *req; - if (unlikely(desc->hdr.state != VIO_DESC_DONE)) - return; + assert_spin_locked(&vio->lock); - ldc_unmap(port->vio.lp, desc->cookies, desc->ncookies); - desc->hdr.state = VIO_DESC_FREE; - dr->cons = vio_dring_next(dr, index); + if (err) + vdc_desc_set_state(desc, VIO_DESC_DONE); + else if (unlikely(desc->hdr.state != VIO_DESC_DONE)) { + pr_err("%s idx=%u err=%d state=%d\n", + __func__, index, err, desc->hdr.state); + return; + } else + err = desc->status; - req = rqe->req; + req = vdc_desc_put(port, index); if (req == NULL) { - vdc_end_special(port, desc); + vdc_end_special(port, err); return; } - rqe->req = NULL; - __blk_end_request(req, (desc->status ? -EIO : 0), desc->size); vdc_blk_queue_start(port); @@ -323,7 +328,7 @@ static int vdc_ack(struct vdc_port *port, void *msgbuf) pkt->start_idx >= VDC_TX_RING_SIZE)) return 0; - vdc_end_one(port, dr, pkt->start_idx); + vdc_end_one(port, dr, pkt->start_idx, 0); return 0; } @@ -404,49 +409,114 @@ out: spin_unlock_irqrestore(&vio->lock, flags); } -static int __vdc_tx_trigger(struct vdc_port *port) +static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx) { - struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING]; + struct vio_driver_state *vio = &port->vio; + struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; struct vio_dring_data hdr = { .tag = { .type = VIO_TYPE_DATA, .stype = VIO_SUBTYPE_INFO, .stype_env = VIO_DRING_DATA, - .sid = vio_send_sid(&port->vio), + .sid = vio_send_sid(vio), }, .dring_ident = dr->ident, - .start_idx = dr->prod, - .end_idx = dr->prod, + .start_idx = idx, + .end_idx = idx, }; int err, delay; - hdr.seq = dr->snd_nxt; delay = 1; do { - err = vio_ldc_send(&port->vio, &hdr, sizeof(hdr)); + hdr.seq = dr->snd_nxt; + err = vio_ldc_send(vio, &hdr, sizeof(hdr)); if (err > 0) { dr->snd_nxt++; + port->req_id++; break; } + udelay(delay); if ((delay <<= 1) > 128) delay = 128; } while (err == -EAGAIN); - if (err == -ENOTCONN) { - printk(KERN_ERR PFX "vio_ldc_send() failure, err=%d.\n", err); - vdc_ldc_reset(port); - } + if (err < 0) + pr_err(PFX "vio_ldc_send() failed, idx=%d err=%d.\n", idx, err); + return err; } -static int __send_request(struct request *req) +static struct vio_disk_desc *vdc_desc_get(struct vdc_port *port, + struct request *req, + unsigned int *idxp) { - struct vdc_port *port = req->rq_disk->private_data; - struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING]; - struct scatterlist sg[port->ring_cookies]; + unsigned int idx; + struct vio_disk_desc *desc = NULL; + struct vio_driver_state *vio = &port->vio; + struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; + + assert_spin_locked(&vio->lock); + + idx = find_first_zero_bit(dr->txmap, dr->nr_txmap); + if (idx < VDC_TX_RING_SIZE) { + bitmap_set(dr->txmap, idx, 1); + desc = dr->base + (dr->entry_size * idx); + if (req) { + BUG_ON(port->rq_arr[idx].req); + port->rq_arr[idx].req = req; + } + *idxp = idx; + } + + return desc; +} + +static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx) +{ + struct vio_driver_state *vio = &port->vio; + struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; + struct vio_disk_desc *desc = vio_dring_entry(dr, idx); struct vdc_req_entry *rqe; - struct vio_disk_desc *desc; + struct request *req; + + assert_spin_locked(&vio->lock); + + ldc_unmap(vio->lp, desc->cookies, desc->ncookies); + + bitmap_clear(dr->txmap, idx, 1); + vdc_desc_set_state(desc, VIO_DESC_FREE); + + rqe = &port->rq_arr[idx]; + req = rqe->req; + rqe->req = NULL; + + return req; +} + +static inline void vdc_desc_set_state(struct vio_disk_desc *desc, int state) +{ + desc->hdr.state = state; + /* + * This has to be a non-SMP write barrier because we are writing + * to memory which is shared with the peer LDOM. + */ + wmb(); +} + +static void __create_flush_desc(struct vdc_port *port, + struct vio_disk_desc *desc) +{ + memset(desc, 0, sizeof(struct vio_disk_desc)); + desc->hdr.ack = VIO_ACK_ENABLE; + desc->req_id = port->req_id; + desc->operation = VD_OP_FLUSH; +} + +static int __create_rw_desc(struct vdc_port *port, struct request *req, + struct vio_disk_desc *desc) +{ + struct scatterlist sg[port->ring_cookies]; unsigned int map_perm; int nsg, err, i; u64 len; @@ -464,49 +534,61 @@ static int __send_request(struct request *req) sg_init_table(sg, port->ring_cookies); nsg = blk_rq_map_sg(req->q, req, sg); + if (!nsg) { + pr_err(PFX "blk_rq_map_sg() failed, nsg=%d.\n", nsg); + return -EIO; + } - len = 0; - for (i = 0; i < nsg; i++) - len += sg[i].length; + memset(desc, 0, sizeof(struct vio_disk_desc)); - desc = vio_dring_cur(dr); + err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies, + port->ring_cookies, map_perm); - err = ldc_map_sg(port->vio.lp, sg, nsg, - desc->cookies, port->ring_cookies, - map_perm); - if (err < 0) { - printk(KERN_ERR PFX "ldc_map_sg() failure, err=%d.\n", err); + if (err <= 0) { + pr_err(PFX "ldc_map_sg() failed, err=%d.\n", err); return err; } - rqe = &port->rq_arr[dr->prod]; - rqe->req = req; + len = 0; + for (i = 0; i < nsg; i++) + len += sg[i].length; desc->hdr.ack = VIO_ACK_ENABLE; desc->req_id = port->req_id; desc->operation = op; - if (port->vdisk_type == VD_DISK_TYPE_DISK) { + if (port->vdisk_type == VD_DISK_TYPE_DISK) desc->slice = 0xff; - } else { + else desc->slice = 0; - } desc->status = ~0; desc->offset = (blk_rq_pos(req) << 9) / port->vdisk_block_size; desc->size = len; desc->ncookies = err; - /* This has to be a non-SMP write barrier because we are writing - * to memory which is shared with the peer LDOM. - */ - wmb(); - desc->hdr.state = VIO_DESC_READY; + return 0; +} - err = __vdc_tx_trigger(port); - if (err < 0) { - printk(KERN_ERR PFX "vdc_tx_trigger() failure, err=%d\n", err); - } else { - port->req_id++; - dr->prod = vio_dring_next(dr, dr->prod); +static int __send_request(struct vdc_port *port, unsigned int idx) +{ + struct vio_driver_state *vio = &port->vio; + struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; + struct vio_disk_desc *desc = vio_dring_entry(dr, idx); + int err; + + vdc_desc_set_state(desc, VIO_DESC_READY); + + while (1) { + err = __vdc_tx_trigger(port, idx); + + if (err == -ECONNRESET || err == -ENOTCONN) { + vdc_ldc_reset(port); + pr_info(PFX "%s retry, idx=%d err=%d\n", + __func__, idx, err); + } else if (err < 0) { + pr_err(PFX "%s error, idx=%d err=%d\n", + __func__, idx, err); + } else + break; } return err; @@ -514,20 +596,42 @@ static int __send_request(struct request *req) static void do_vdc_request(struct request_queue *rq) { + struct vdc_port *port = NULL; + struct vio_dring_state *dr = NULL; struct request *req; + unsigned int idx = 0; + int err = 0; while ((req = blk_peek_request(rq)) != NULL) { - struct vdc_port *port; - struct vio_dring_state *dr; + struct vio_disk_desc *desc; port = req->rq_disk->private_data; dr = &port->vio.drings[VIO_DRIVER_TX_RING]; + if (unlikely(vdc_tx_dring_avail(dr) < 1)) goto wait; + desc = vdc_desc_get(port, req, &idx); + + /* Note that REQ_FLUSH requests with a payload are + * automatically turned into a sequence of an empty + * REQ_FLUSH request followed by the actual i/o by + * the block layer. + */ + if (req->cmd_flags & REQ_FLUSH) { + if (req->bio != NULL) + pr_err(PFX "non-empty REQ_FLUSH, lost I/O\n"); + __create_flush_desc(port, desc); + } else + err = __create_rw_desc(port, req, desc); + + if (err) + goto wait; + blk_start_request(req); - if (__send_request(req) < 0) { + err = __send_request(port, idx); + if (err < 0) { blk_requeue_request(rq, req); wait: /* Avoid pointless unplugs. */ @@ -535,15 +639,19 @@ wait: break; } } + + if (err < 0 && + err != -ENOMEM && err != -ECONNRESET && err != -ENOTCONN) + vdc_end_one(port, dr, idx, err); } static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) { - struct vio_dring_state *dr; struct vio_completion comp; struct vio_disk_desc *desc; - unsigned int map_perm; unsigned long flags; + unsigned int map_perm; + unsigned int idx; int op_len, err; void *req_buf; @@ -622,21 +730,17 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) spin_lock_irqsave(&port->vio.lock, flags); - dr = &port->vio.drings[VIO_DRIVER_TX_RING]; - - /* XXX If we want to use this code generically we have to - * XXX handle TX ring exhaustion etc. - */ - desc = vio_dring_cur(dr); + desc = vdc_desc_get(port, NULL, &idx); + if (!desc) { + err = -ENOMEM; + goto done; + } err = ldc_map_single(port->vio.lp, req_buf, op_len, desc->cookies, port->ring_cookies, map_perm); - if (err < 0) { - spin_unlock_irqrestore(&port->vio.lock, flags); - kfree(req_buf); - return err; - } + if (err < 0) + goto done; init_completion(&comp.com); comp.waiting_for = WAITING_FOR_GEN_CMD; @@ -651,44 +755,58 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len) desc->size = op_len; desc->ncookies = err; - /* This has to be a non-SMP write barrier because we are writing - * to memory which is shared with the peer LDOM. - */ - wmb(); - desc->hdr.state = VIO_DESC_READY; - - err = __vdc_tx_trigger(port); + err = __send_request(port, idx); if (err >= 0) { - port->req_id++; - dr->prod = vio_dring_next(dr, dr->prod); spin_unlock_irqrestore(&port->vio.lock, flags); - wait_for_completion(&comp.com); err = comp.err; + spin_lock_irqsave(&port->vio.lock, flags); } else { port->vio.cmp = NULL; - spin_unlock_irqrestore(&port->vio.lock, flags); + goto done; } if (map_perm & LDC_MAP_W) memcpy(buf, req_buf, len); +done: + (void) vdc_desc_put(port, idx); + spin_unlock_irqrestore(&port->vio.lock, flags); kfree(req_buf); return err; } +static int vio_txring_alloc(struct vio_dring_state *dr, unsigned int nr_tx) +{ + unsigned int sz; + + sz = BITS_TO_LONGS(nr_tx) * sizeof(unsigned long); + dr->txmap = kzalloc(sz, GFP_KERNEL); + + if (!dr->txmap) + return -ENOMEM; + + dr->nr_txmap = nr_tx; + return 0; +} + static int vdc_alloc_tx_ring(struct vdc_port *port) { struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING]; unsigned long len, entry_size; int ncookies; void *dring; + int ret; entry_size = sizeof(struct vio_disk_desc) + (sizeof(struct ldc_trans_cookie) * port->ring_cookies); len = (VDC_TX_RING_SIZE * entry_size); + ret = vio_txring_alloc(dr, VDC_TX_RING_SIZE); + if (ret) + return ret; + ncookies = VIO_MAX_RING_COOKIES; dring = ldc_alloc_exp_dring(port->vio.lp, len, dr->cookies, &ncookies, @@ -701,7 +819,6 @@ static int vdc_alloc_tx_ring(struct vdc_port *port) dr->base = dring; dr->entry_size = entry_size; dr->num_entries = VDC_TX_RING_SIZE; - dr->prod = dr->cons = 0; dr->pending = VDC_TX_RING_SIZE; dr->ncookies = ncookies; @@ -789,6 +906,14 @@ static int probe_disk(struct vdc_port *port) blk_queue_segment_boundary(q, PAGE_SIZE - 1); blk_queue_max_segment_size(q, PAGE_SIZE); + /* vds may be a device with volatile caching and in protocol 1.3 + * can perform out of order completion. REQ_FLUSH/REQ_FUA are + * used to signal completion barriers. + * REQ_FUA is turned into a following REQ_FLUSH by block layer + * if not supported directly. + */ + blk_queue_flush(q, REQ_FLUSH); + blk_queue_max_segments(q, port->ring_cookies); blk_queue_max_hw_sectors(q, port->max_xfer_size); g->major = vdc_major; @@ -836,7 +961,7 @@ static int probe_disk(struct vdc_port *port) static struct ldc_channel_config vdc_ldc_cfg = { .event = vdc_event, - .mtu = 64, + .mtu = 256, .mode = LDC_MODE_UNRELIABLE, }; @@ -984,25 +1109,23 @@ static int vdc_port_remove(struct vio_dev *vdev) static void vdc_requeue_inflight(struct vdc_port *port) { - struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING]; - u32 idx; - - for (idx = dr->cons; idx != dr->prod; idx = vio_dring_next(dr, idx)) { - struct vio_disk_desc *desc = vio_dring_entry(dr, idx); - struct vdc_req_entry *rqe = &port->rq_arr[idx]; - struct request *req; + struct vio_driver_state *vio = &port->vio; + struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING]; + struct vio_disk_desc *desc; + struct request *req; + unsigned int idx; - ldc_unmap(port->vio.lp, desc->cookies, desc->ncookies); - desc->hdr.state = VIO_DESC_FREE; - dr->cons = vio_dring_next(dr, idx); + assert_spin_locked(&vio->lock); - req = rqe->req; + for (idx = find_first_bit(dr->txmap, dr->nr_txmap); + idx < dr->nr_txmap; + idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) { + req = vdc_desc_put(port, idx); if (req == NULL) { - vdc_end_special(port, desc); + desc = vio_dring_entry(dr, idx); + vdc_end_special(port, desc->status); continue; } - - rqe->req = NULL; blk_requeue_request(port->disk->queue, req); } } diff --git a/drivers/block/vds/vds.h b/drivers/block/vds/vds.h index 7d8da46c2ce6..32474291508f 100644 --- a/drivers/block/vds/vds.h +++ b/drivers/block/vds/vds.h @@ -111,25 +111,25 @@ int vds_vtoc_clear(struct vds_port *port); */ #define vds_be_rlock(p) \ do { \ - vdsdbg(LOCK, "backend rlock\n"); \ + vdsdbg(BELOCK, "backend rlock\n"); \ down_read(&(p)->be_lock); \ } while (0) #define vds_be_runlock(p) \ do { \ - vdsdbg(LOCK, "backend runlock\n"); \ + vdsdbg(BELOCK, "backend runlock\n"); \ up_read(&(p)->be_lock); \ } while (0) #define vds_be_wlock(p) \ do { \ - vdsdbg(LOCK, "backend wlock\n"); \ + vdsdbg(BELOCK, "backend wlock\n"); \ down_write(&(p)->be_lock); \ } while (0) #define vds_be_wunlock(p) \ do { \ - vdsdbg(LOCK, "backend wunlock\n"); \ + vdsdbg(BELOCK, "backend wunlock\n"); \ up_write(&(p)->be_lock); \ } while (0) @@ -189,6 +189,7 @@ int vds_vtoc_clear(struct vds_port *port); #define VDS_DEBUG_IO 0x100 #define VDS_DEBUG_BIO 0x200 #define VDS_DEBUG_FIO 0x400 +#define VDS_DEBUG_BELOCK 0x800 extern int vds_dbg; extern int vds_dbg_ldc; diff --git a/drivers/block/vds/vds_io.c b/drivers/block/vds/vds_io.c index 5084bdb740a9..13d7301fbf2a 100644 --- a/drivers/block/vds/vds_io.c +++ b/drivers/block/vds/vds_io.c @@ -214,6 +214,8 @@ void vds_io_done(struct vds_io *io) { struct vio_driver_state *vio = io->vio; struct vds_port *port = to_vds_port(vio); + struct list_head *pos, *tmp; + struct vds_io *ent; unsigned long flags; vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(), @@ -230,11 +232,23 @@ void vds_io_done(struct vds_io *io) * The reset can be initiated by an explicit incoming request * or while processing an IO request. Wakeup anyone waiting on * the IO list in either case. + * + * With out of order execution, the reset may result from the + * completion of a request that started later but completed + * earlier than other requests on the IO queue. This should be + * fine since after the connection is re-establised, the client + * will resend all requests for which it has received no response. */ vds_vio_lock(vio, flags); list_del(&io->list); - if (io->flags & VDS_IO_FINI) + if (io->flags & VDS_IO_FINI) { + list_for_each_safe(pos, tmp, &port->io_list) { + ent = list_entry(pos, struct vds_io, list); + ent->flags |= VDS_IO_DROP; + } INIT_LIST_HEAD(&port->io_list); + + } wake_up(&port->wait); vds_vio_unlock(vio, flags); vds_io_free(io); diff --git a/drivers/block/vds/vds_io.h b/drivers/block/vds/vds_io.h index cd4efd8f5a66..478929739307 100644 --- a/drivers/block/vds/vds_io.h +++ b/drivers/block/vds/vds_io.h @@ -42,6 +42,7 @@ struct vds_io { #define VDS_IO_CACHE 0x1 #define VDS_IO_INIT 0x2 #define VDS_IO_FINI 0x4 +#define VDS_IO_DROP 0x8 int vds_io_init(void); void vds_io_fini(void); diff --git a/drivers/block/vds/vds_main.c b/drivers/block/vds/vds_main.c index a3453181b951..d0655da88b35 100644 --- a/drivers/block/vds/vds_main.c +++ b/drivers/block/vds/vds_main.c @@ -56,10 +56,15 @@ MODULE_VERSION(DRV_MOD_VERSION); * conceivably block when the backend is closed. The serialization should * ensure that a following handshake initiates only after the reset is done. * + * Out of order execution bypasses vds_io_wait() except for FLUSH. This means + * that a request may start later but complete and respond to the client + * earlier than other requests. + * * The recommended value for the size of the kernel workqueue is 0; * it creates threads which scale with ncpu. */ int vds_wq; +int vds_ooo; /* out of order execution default value */ int vds_dbg; int vds_dbg_ldc; int vds_dbg_vio; @@ -68,13 +73,20 @@ module_param(vds_dbg, uint, 0664); module_param(vds_dbg_ldc, uint, 0664); module_param(vds_dbg_vio, uint, 0664); module_param(vds_wq, uint, 0664); +module_param(vds_ooo, uint, 0664); /* Ordered from largest major to lowest */ static struct vio_version vds_versions[] = { + { .major = 1, .minor = 3 }, { .major = 1, .minor = 1 }, { .major = 1, .minor = 0 }, }; +static inline int vds_version_supp(struct vds_port *port, u16 major, u16 minor) +{ + return port->vio.ver.major == major && port->vio.ver.minor >= minor; +} + static void vds_handshake_complete(struct vio_driver_state *vio) { struct vio_dring_state *dr; @@ -148,10 +160,7 @@ static int vds_handle_attr(struct vio_driver_state *vio, void *arg) * Set the maximum expected message length to * accommodate in-band-descriptor messages with all * their cookies. - */ - vio->desc_buf_len = max_inband_msglen; - - /* + * * Reallocate before responding to the message since * the next request in the handshake will use this size * and a small msgbuf would make the ldc read fail. @@ -360,7 +369,7 @@ static void vds_bh_hs(struct work_struct *work) if (io->flags & VDS_IO_INIT) err = vds_be_init(port); - vds_io_wait(io); + vds_io_wait(io); /* handshake is always in order */ if (!err) err = vio_control_pkt_engine(vio, port->msgbuf); @@ -368,6 +377,9 @@ static void vds_bh_hs(struct work_struct *work) if (err) vdsmsg(err, "%s: handshake failed (%d)\n", port->path, err); + if (vds_version_supp(port, 1, 3)) + vds_ooo = 1; + vds_io_done(io); } @@ -426,9 +438,12 @@ static void vds_bh_io(struct work_struct *work) if (io->ack == VIO_SUBTYPE_ACK && err != 0 && io->error == 0) io->error = err > 0 ? err : -err; - vds_io_wait(io); + if (!vds_ooo) + vds_io_wait(io); - if (port->xfer_mode == VIO_DRING_MODE) + if (io->flags & VDS_IO_DROP) + ; + else if (port->xfer_mode == VIO_DRING_MODE) (void) vds_dring_done(io); else if (port->xfer_mode == VIO_DESC_MODE) (void) vds_desc_done(io); @@ -436,11 +451,10 @@ static void vds_bh_io(struct work_struct *work) BUG(); /* - * If there was a reset then the IO request has been - * converted to a reset request queued to be executed. + * Any request, including one that was converted + * to a reset ends up here to be completed. */ - if (!(io->flags & VDS_IO_FINI)) - vds_io_done(io); + vds_io_done(io); } static void vds_reset(struct vds_io *io) @@ -474,7 +488,6 @@ static void vds_reset(struct vds_io *io) vds_vio_lock(vio, flags); vio_link_state_change(vio, LDC_EVENT_RESET); - vio->desc_buf_len = 0; port->flags = 0; kfree(port->msgbuf); @@ -500,10 +513,17 @@ static void vds_bh_reset(struct work_struct *work) struct vds_io *io = container_of(work, struct vds_io, vds_work); struct vio_driver_state *vio = io->vio; - vds_io_wait(io); + if (!vds_ooo) + vds_io_wait(io); vds_reset(io); - ldc_enable_hv_intr(vio->lp); vds_io_done(io); + + /* + * Enable LDC interrupt after the request completion + * so that no new requests are queued while the IO + * queue is discarded during reset processing. + */ + ldc_enable_hv_intr(vio->lp); } static int vds_dring_io(struct vio_driver_state *vio) @@ -806,7 +826,7 @@ static void vds_event(void *arg, int event) static struct ldc_channel_config vds_ldc_cfg = { .event = vds_event, - .mtu = 64, + .mtu = 256, .mode = LDC_MODE_UNRELIABLE, }; -- 2.50.1