]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sparc64: Enable virtual disk protocol out of order execution
authorBijan Mottahedeh <Bijan.Mottahedeh@oracle.com>
Sun, 12 Jun 2016 15:39:52 +0000 (08:39 -0700)
committerAllen Pais <allen.pais@oracle.com>
Tue, 8 Nov 2016 10:07:46 +0000 (15:37 +0530)
Orabug: 24815498

Signed-off-by: Bijan Mottahedeh <bijan.mottahedeh@oracle.com>
Reviewed-by: Alexandre Chartre <alexandre.chartre@oracle.com>
Reviewed-by: Chris Hyser <Chris.Hyser@oracle.com>
Signed-off-by: Allen Pais <allen.pais@oracle.com>
arch/sparc/include/asm/vio.h
drivers/block/sunvdc.c
drivers/block/vds/vds.h
drivers/block/vds/vds_io.c
drivers/block/vds/vds_io.h
drivers/block/vds/vds_main.c

index e990d29b64a18d37806d0805f0763df44f65b130..bfd4cc86aef52401d15dc5f4df46d6dec81af16c 100644 (file)
@@ -310,6 +310,8 @@ struct vio_dring_state {
        void                    *base;
        u64                     snd_nxt;
        u64                     rcv_nxt;
+       unsigned long           *txmap;
+       unsigned long           nr_txmap;
        u32                     entry_size;
        u32                     num_entries;
        u32                     prod;
index 70b34b1409db48a19fe769848f875128d05d05b8..b3f342165af84eb638efeb2a746a2665103d0eb2 100644 (file)
@@ -12,6 +12,7 @@
 #include <linux/cdrom.h>
 #include <linux/slab.h>
 #include <linux/spinlock.h>
+#include <linux/bitmap.h>
 #include <linux/completion.h>
 #include <linux/delay.h>
 #include <linux/init.h>
@@ -23,8 +24,8 @@
 
 #define DRV_MODULE_NAME                "sunvdc"
 #define PFX DRV_MODULE_NAME    ": "
-#define DRV_MODULE_VERSION     "1.2"
-#define DRV_MODULE_RELDATE     "November 24, 2014"
+#define DRV_MODULE_VERSION     "1.3"
+#define DRV_MODULE_RELDATE     "September 24, 2016"
 
 static char version[] =
        DRV_MODULE_NAME ".c:v" DRV_MODULE_VERSION " (" DRV_MODULE_RELDATE ")\n";
@@ -80,6 +81,8 @@ struct vdc_port {
 static void vdc_ldc_reset(struct vdc_port *port);
 static void vdc_ldc_reset_work(struct work_struct *work);
 static void vdc_ldc_reset_timer(unsigned long _arg);
+static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx);
+static inline void vdc_desc_set_state(struct vio_disk_desc *, int);
 
 static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio)
 {
@@ -88,6 +91,8 @@ static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio)
 
 /* Ordered from largest major to lowest */
 static struct vio_version vdc_versions[] = {
+       { .major = 1, .minor = 3 },
+       { .major = 1, .minor = 2 },
        { .major = 1, .minor = 1 },
        { .major = 1, .minor = 0 },
 };
@@ -104,7 +109,7 @@ static int vdc_major;
 
 static inline u32 vdc_tx_dring_avail(struct vio_dring_state *dr)
 {
-       return vio_dring_avail(dr, VDC_TX_RING_SIZE);
+       return VDC_TX_RING_SIZE - bitmap_weight(dr->txmap, dr->nr_txmap);
 }
 
 static int vdc_getgeo(struct block_device *bdev, struct hd_geometry *geo)
@@ -279,35 +284,35 @@ static int vdc_handle_attr(struct vio_driver_state *vio, void *arg)
        }
 }
 
-static void vdc_end_special(struct vdc_port *port, struct vio_disk_desc *desc)
+static void vdc_end_special(struct vdc_port *port, int err)
 {
-       int err = desc->status;
-
        vdc_finish(&port->vio, -err, WAITING_FOR_GEN_CMD);
 }
 
 static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr,
-                       unsigned int index)
+                       unsigned int index, int err)
 {
        struct vio_disk_desc *desc = vio_dring_entry(dr, index);
-       struct vdc_req_entry *rqe = &port->rq_arr[index];
+       struct vio_driver_state *vio = &port->vio;
        struct request *req;
 
-       if (unlikely(desc->hdr.state != VIO_DESC_DONE))
-               return;
+       assert_spin_locked(&vio->lock);
 
-       ldc_unmap(port->vio.lp, desc->cookies, desc->ncookies);
-       desc->hdr.state = VIO_DESC_FREE;
-       dr->cons = vio_dring_next(dr, index);
+       if (err)
+               vdc_desc_set_state(desc, VIO_DESC_DONE);
+       else if (unlikely(desc->hdr.state != VIO_DESC_DONE)) {
+               pr_err("%s idx=%u err=%d state=%d\n",
+                       __func__, index, err, desc->hdr.state);
+               return;
+       } else
+               err = desc->status;
 
-       req = rqe->req;
+       req = vdc_desc_put(port, index);
        if (req == NULL) {
-               vdc_end_special(port, desc);
+               vdc_end_special(port, err);
                return;
        }
 
-       rqe->req = NULL;
-
        __blk_end_request(req, (desc->status ? -EIO : 0), desc->size);
 
        vdc_blk_queue_start(port);
@@ -323,7 +328,7 @@ static int vdc_ack(struct vdc_port *port, void *msgbuf)
                     pkt->start_idx >= VDC_TX_RING_SIZE))
                return 0;
 
-       vdc_end_one(port, dr, pkt->start_idx);
+       vdc_end_one(port, dr, pkt->start_idx, 0);
 
        return 0;
 }
@@ -404,49 +409,114 @@ out:
        spin_unlock_irqrestore(&vio->lock, flags);
 }
 
-static int __vdc_tx_trigger(struct vdc_port *port)
+static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx)
 {
-       struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+       struct vio_driver_state *vio = &port->vio;
+       struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
        struct vio_dring_data hdr = {
                .tag = {
                        .type           = VIO_TYPE_DATA,
                        .stype          = VIO_SUBTYPE_INFO,
                        .stype_env      = VIO_DRING_DATA,
-                       .sid            = vio_send_sid(&port->vio),
+                       .sid            = vio_send_sid(vio),
                },
                .dring_ident            = dr->ident,
-               .start_idx              = dr->prod,
-               .end_idx                = dr->prod,
+               .start_idx              = idx,
+               .end_idx                = idx,
        };
        int err, delay;
 
-       hdr.seq = dr->snd_nxt;
        delay = 1;
        do {
-               err = vio_ldc_send(&port->vio, &hdr, sizeof(hdr));
+               hdr.seq = dr->snd_nxt;
+               err = vio_ldc_send(vio, &hdr, sizeof(hdr));
                if (err > 0) {
                        dr->snd_nxt++;
+                       port->req_id++;
                        break;
                }
+
                udelay(delay);
                if ((delay <<= 1) > 128)
                        delay = 128;
        } while (err == -EAGAIN);
 
-       if (err == -ENOTCONN) {
-               printk(KERN_ERR PFX "vio_ldc_send() failure, err=%d.\n", err);
-               vdc_ldc_reset(port);
-       }
+       if (err < 0)
+               pr_err(PFX "vio_ldc_send() failed, idx=%d err=%d.\n", idx, err);
+
        return err;
 }
 
-static int __send_request(struct request *req)
+static struct vio_disk_desc *vdc_desc_get(struct vdc_port *port,
+                                         struct request *req,
+                                         unsigned int *idxp)
 {
-       struct vdc_port *port = req->rq_disk->private_data;
-       struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
-       struct scatterlist sg[port->ring_cookies];
+       unsigned int idx;
+       struct vio_disk_desc *desc = NULL;
+       struct vio_driver_state *vio = &port->vio;
+       struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+
+       assert_spin_locked(&vio->lock);
+
+       idx = find_first_zero_bit(dr->txmap, dr->nr_txmap);
+       if (idx < VDC_TX_RING_SIZE) {
+               bitmap_set(dr->txmap, idx, 1);
+               desc = dr->base + (dr->entry_size * idx);
+               if (req) {
+                       BUG_ON(port->rq_arr[idx].req);
+                       port->rq_arr[idx].req = req;
+               }
+               *idxp = idx;
+       }
+
+       return desc;
+}
+
+static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx)
+{
+       struct vio_driver_state *vio = &port->vio;
+       struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+       struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
        struct vdc_req_entry *rqe;
-       struct vio_disk_desc *desc;
+       struct request *req;
+
+       assert_spin_locked(&vio->lock);
+
+       ldc_unmap(vio->lp, desc->cookies, desc->ncookies);
+
+       bitmap_clear(dr->txmap, idx, 1);
+       vdc_desc_set_state(desc, VIO_DESC_FREE);
+
+       rqe = &port->rq_arr[idx];
+       req = rqe->req;
+       rqe->req = NULL;
+
+       return req;
+}
+
+static inline void vdc_desc_set_state(struct vio_disk_desc *desc, int state)
+{
+       desc->hdr.state = state;
+       /*
+        * This has to be a non-SMP write barrier because we are writing
+        * to memory which is shared with the peer LDOM.
+        */
+       wmb();
+}
+
+static void __create_flush_desc(struct vdc_port *port,
+       struct vio_disk_desc *desc)
+{
+       memset(desc, 0, sizeof(struct vio_disk_desc));
+       desc->hdr.ack = VIO_ACK_ENABLE;
+       desc->req_id = port->req_id;
+       desc->operation = VD_OP_FLUSH;
+}
+
+static int __create_rw_desc(struct vdc_port *port, struct request *req,
+                           struct vio_disk_desc *desc)
+{
+       struct scatterlist sg[port->ring_cookies];
        unsigned int map_perm;
        int nsg, err, i;
        u64 len;
@@ -464,49 +534,61 @@ static int __send_request(struct request *req)
 
        sg_init_table(sg, port->ring_cookies);
        nsg = blk_rq_map_sg(req->q, req, sg);
+       if (!nsg) {
+               pr_err(PFX "blk_rq_map_sg() failed, nsg=%d.\n", nsg);
+               return -EIO;
+       }
 
-       len = 0;
-       for (i = 0; i < nsg; i++)
-               len += sg[i].length;
+       memset(desc, 0, sizeof(struct vio_disk_desc));
 
-       desc = vio_dring_cur(dr);
+       err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies,
+                        port->ring_cookies, map_perm);
 
-       err = ldc_map_sg(port->vio.lp, sg, nsg,
-                        desc->cookies, port->ring_cookies,
-                        map_perm);
-       if (err < 0) {
-               printk(KERN_ERR PFX "ldc_map_sg() failure, err=%d.\n", err);
+       if (err <= 0) {
+               pr_err(PFX "ldc_map_sg() failed, err=%d.\n", err);
                return err;
        }
 
-       rqe = &port->rq_arr[dr->prod];
-       rqe->req = req;
+       len = 0;
+       for (i = 0; i < nsg; i++)
+               len += sg[i].length;
 
        desc->hdr.ack = VIO_ACK_ENABLE;
        desc->req_id = port->req_id;
        desc->operation = op;
-       if (port->vdisk_type == VD_DISK_TYPE_DISK) {
+       if (port->vdisk_type == VD_DISK_TYPE_DISK)
                desc->slice = 0xff;
-       } else {
+       else
                desc->slice = 0;
-       }
        desc->status = ~0;
        desc->offset = (blk_rq_pos(req) << 9) / port->vdisk_block_size;
        desc->size = len;
        desc->ncookies = err;
 
-       /* This has to be a non-SMP write barrier because we are writing
-        * to memory which is shared with the peer LDOM.
-        */
-       wmb();
-       desc->hdr.state = VIO_DESC_READY;
+       return 0;
+}
 
-       err = __vdc_tx_trigger(port);
-       if (err < 0) {
-               printk(KERN_ERR PFX "vdc_tx_trigger() failure, err=%d\n", err);
-       } else {
-               port->req_id++;
-               dr->prod = vio_dring_next(dr, dr->prod);
+static int __send_request(struct vdc_port *port, unsigned int idx)
+{
+       struct vio_driver_state *vio = &port->vio;
+       struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+       struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
+       int err;
+
+       vdc_desc_set_state(desc, VIO_DESC_READY);
+
+       while (1) {
+               err = __vdc_tx_trigger(port, idx);
+
+               if (err == -ECONNRESET || err == -ENOTCONN) {
+                       vdc_ldc_reset(port);
+                       pr_info(PFX "%s retry, idx=%d err=%d\n",
+                               __func__, idx, err);
+               } else if (err < 0) {
+                       pr_err(PFX "%s error, idx=%d err=%d\n",
+                               __func__, idx, err);
+               } else
+                       break;
        }
 
        return err;
@@ -514,20 +596,42 @@ static int __send_request(struct request *req)
 
 static void do_vdc_request(struct request_queue *rq)
 {
+       struct vdc_port *port = NULL;
+       struct vio_dring_state *dr = NULL;
        struct request *req;
+       unsigned int idx = 0;
+       int err = 0;
 
        while ((req = blk_peek_request(rq)) != NULL) {
-               struct vdc_port *port;
-               struct vio_dring_state *dr;
+               struct vio_disk_desc *desc;
 
                port = req->rq_disk->private_data;
                dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+
                if (unlikely(vdc_tx_dring_avail(dr) < 1))
                        goto wait;
 
+               desc = vdc_desc_get(port, req, &idx);
+
+               /* Note that REQ_FLUSH requests with a payload are
+                * automatically turned into a sequence of an empty
+                * REQ_FLUSH request followed by the actual i/o by
+                * the block layer.
+                */
+               if (req->cmd_flags & REQ_FLUSH) {
+                       if (req->bio != NULL)
+                               pr_err(PFX "non-empty REQ_FLUSH, lost I/O\n");
+                       __create_flush_desc(port, desc);
+               } else
+                       err = __create_rw_desc(port, req, desc);
+
+               if (err)
+                       goto wait;
+
                blk_start_request(req);
 
-               if (__send_request(req) < 0) {
+               err = __send_request(port, idx);
+               if (err < 0) {
                        blk_requeue_request(rq, req);
 wait:
                        /* Avoid pointless unplugs. */
@@ -535,15 +639,19 @@ wait:
                        break;
                }
        }
+
+       if (err < 0 &&
+           err != -ENOMEM && err != -ECONNRESET && err != -ENOTCONN)
+               vdc_end_one(port, dr, idx, err);
 }
 
 static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
 {
-       struct vio_dring_state *dr;
        struct vio_completion comp;
        struct vio_disk_desc *desc;
-       unsigned int map_perm;
        unsigned long flags;
+       unsigned int map_perm;
+       unsigned int idx;
        int op_len, err;
        void *req_buf;
 
@@ -622,21 +730,17 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
 
        spin_lock_irqsave(&port->vio.lock, flags);
 
-       dr = &port->vio.drings[VIO_DRIVER_TX_RING];
-
-       /* XXX If we want to use this code generically we have to
-        * XXX handle TX ring exhaustion etc.
-        */
-       desc = vio_dring_cur(dr);
+       desc = vdc_desc_get(port, NULL, &idx);
+       if (!desc) {
+               err = -ENOMEM;
+               goto done;
+       }
 
        err = ldc_map_single(port->vio.lp, req_buf, op_len,
                             desc->cookies, port->ring_cookies,
                             map_perm);
-       if (err < 0) {
-               spin_unlock_irqrestore(&port->vio.lock, flags);
-               kfree(req_buf);
-               return err;
-       }
+       if (err < 0)
+               goto done;
 
        init_completion(&comp.com);
        comp.waiting_for = WAITING_FOR_GEN_CMD;
@@ -651,44 +755,58 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
        desc->size = op_len;
        desc->ncookies = err;
 
-       /* This has to be a non-SMP write barrier because we are writing
-        * to memory which is shared with the peer LDOM.
-        */
-       wmb();
-       desc->hdr.state = VIO_DESC_READY;
-
-       err = __vdc_tx_trigger(port);
+       err = __send_request(port, idx);
        if (err >= 0) {
-               port->req_id++;
-               dr->prod = vio_dring_next(dr, dr->prod);
                spin_unlock_irqrestore(&port->vio.lock, flags);
-
                wait_for_completion(&comp.com);
                err = comp.err;
+               spin_lock_irqsave(&port->vio.lock, flags);
        } else {
                port->vio.cmp = NULL;
-               spin_unlock_irqrestore(&port->vio.lock, flags);
+               goto done;
        }
 
        if (map_perm & LDC_MAP_W)
                memcpy(buf, req_buf, len);
 
+done:
+       (void) vdc_desc_put(port, idx);
+       spin_unlock_irqrestore(&port->vio.lock, flags);
        kfree(req_buf);
 
        return err;
 }
 
+static int vio_txring_alloc(struct vio_dring_state *dr, unsigned int nr_tx)
+{
+       unsigned int sz;
+
+       sz = BITS_TO_LONGS(nr_tx) * sizeof(unsigned long);
+       dr->txmap = kzalloc(sz, GFP_KERNEL);
+
+       if (!dr->txmap)
+               return -ENOMEM;
+
+       dr->nr_txmap = nr_tx;
+       return 0;
+}
+
 static int vdc_alloc_tx_ring(struct vdc_port *port)
 {
        struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
        unsigned long len, entry_size;
        int ncookies;
        void *dring;
+       int ret;
 
        entry_size = sizeof(struct vio_disk_desc) +
                (sizeof(struct ldc_trans_cookie) * port->ring_cookies);
        len = (VDC_TX_RING_SIZE * entry_size);
 
+       ret = vio_txring_alloc(dr, VDC_TX_RING_SIZE);
+       if (ret)
+               return ret;
+
        ncookies = VIO_MAX_RING_COOKIES;
        dring = ldc_alloc_exp_dring(port->vio.lp, len,
                                    dr->cookies, &ncookies,
@@ -701,7 +819,6 @@ static int vdc_alloc_tx_ring(struct vdc_port *port)
        dr->base = dring;
        dr->entry_size = entry_size;
        dr->num_entries = VDC_TX_RING_SIZE;
-       dr->prod = dr->cons = 0;
        dr->pending = VDC_TX_RING_SIZE;
        dr->ncookies = ncookies;
 
@@ -789,6 +906,14 @@ static int probe_disk(struct vdc_port *port)
        blk_queue_segment_boundary(q, PAGE_SIZE - 1);
        blk_queue_max_segment_size(q, PAGE_SIZE);
 
+       /* vds may be a device with volatile caching and in protocol 1.3
+        * can perform out of order completion. REQ_FLUSH/REQ_FUA are
+        * used to signal completion barriers.
+        * REQ_FUA is turned into a following REQ_FLUSH by block layer
+        * if not supported directly.
+        */
+       blk_queue_flush(q, REQ_FLUSH);
+
        blk_queue_max_segments(q, port->ring_cookies);
        blk_queue_max_hw_sectors(q, port->max_xfer_size);
        g->major = vdc_major;
@@ -836,7 +961,7 @@ static int probe_disk(struct vdc_port *port)
 
 static struct ldc_channel_config vdc_ldc_cfg = {
        .event          = vdc_event,
-       .mtu            = 64,
+       .mtu            = 256,
        .mode           = LDC_MODE_UNRELIABLE,
 };
 
@@ -984,25 +1109,23 @@ static int vdc_port_remove(struct vio_dev *vdev)
 
 static void vdc_requeue_inflight(struct vdc_port *port)
 {
-       struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
-       u32 idx;
-
-       for (idx = dr->cons; idx != dr->prod; idx = vio_dring_next(dr, idx)) {
-               struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
-               struct vdc_req_entry *rqe = &port->rq_arr[idx];
-               struct request *req;
+       struct vio_driver_state *vio = &port->vio;
+       struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+       struct vio_disk_desc *desc;
+       struct request *req;
+       unsigned int idx;
 
-               ldc_unmap(port->vio.lp, desc->cookies, desc->ncookies);
-               desc->hdr.state = VIO_DESC_FREE;
-               dr->cons = vio_dring_next(dr, idx);
+       assert_spin_locked(&vio->lock);
 
-               req = rqe->req;
+       for (idx = find_first_bit(dr->txmap, dr->nr_txmap);
+            idx < dr->nr_txmap;
+            idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) {
+               req = vdc_desc_put(port, idx);
                if (req == NULL) {
-                       vdc_end_special(port, desc);
+                       desc = vio_dring_entry(dr, idx);
+                       vdc_end_special(port, desc->status);
                        continue;
                }
-
-               rqe->req = NULL;
                blk_requeue_request(port->disk->queue, req);
        }
 }
index 7d8da46c2ce67fa25c4eb22d262ea623b1950b5a..32474291508fe7b75d4f4f514b6639d504c74e8e 100644 (file)
@@ -111,25 +111,25 @@ int vds_vtoc_clear(struct vds_port *port);
  */
 #define        vds_be_rlock(p)                                         \
        do {                                                    \
-               vdsdbg(LOCK, "backend rlock\n");                \
+               vdsdbg(BELOCK, "backend rlock\n");              \
                down_read(&(p)->be_lock);                       \
        } while (0)
 
 #define        vds_be_runlock(p)                                       \
        do {                                                    \
-               vdsdbg(LOCK, "backend runlock\n");              \
+               vdsdbg(BELOCK, "backend runlock\n");            \
                up_read(&(p)->be_lock);                         \
        } while (0)
 
 #define        vds_be_wlock(p)                                         \
        do {                                                    \
-               vdsdbg(LOCK, "backend wlock\n");                \
+               vdsdbg(BELOCK, "backend wlock\n");              \
                down_write(&(p)->be_lock);                      \
        } while (0)
 
 #define        vds_be_wunlock(p)                                       \
        do {                                                    \
-               vdsdbg(LOCK, "backend wunlock\n");              \
+               vdsdbg(BELOCK, "backend wunlock\n");            \
                up_write(&(p)->be_lock);                        \
        } while (0)
 
@@ -189,6 +189,7 @@ int vds_vtoc_clear(struct vds_port *port);
 #define VDS_DEBUG_IO           0x100
 #define VDS_DEBUG_BIO          0x200
 #define VDS_DEBUG_FIO          0x400
+#define VDS_DEBUG_BELOCK       0x800
 
 extern int vds_dbg;
 extern int vds_dbg_ldc;
index 5084bdb740a9fc815f67d459b209665aba03a8ff..13d7301fbf2ab5a911a66ccaae9e4ba6fe99b374 100644 (file)
@@ -214,6 +214,8 @@ void vds_io_done(struct vds_io *io)
 {
        struct vio_driver_state *vio = io->vio;
        struct vds_port *port = to_vds_port(vio);
+       struct list_head *pos, *tmp;
+       struct vds_io *ent;
        unsigned long flags;
 
        vdsdbg(WQ, "io=%p cpu=%d first=%p\n", io, smp_processor_id(),
@@ -230,11 +232,23 @@ void vds_io_done(struct vds_io *io)
         * The reset can be initiated by an explicit incoming request
         * or while processing an IO request.  Wakeup anyone waiting on
         * the IO list in either case.
+        *
+        * With out of order execution, the reset may result from the
+        * completion of a request that started later but completed
+        * earlier than other requests on the IO queue.  This should be
+        * fine since after the connection is re-establised, the client
+        * will resend all requests for which it has received no response.
         */
        vds_vio_lock(vio, flags);
        list_del(&io->list);
-       if (io->flags & VDS_IO_FINI)
+       if (io->flags & VDS_IO_FINI) {
+               list_for_each_safe(pos, tmp, &port->io_list) {
+                       ent = list_entry(pos, struct vds_io, list);
+                       ent->flags |= VDS_IO_DROP;
+               }
                INIT_LIST_HEAD(&port->io_list);
+
+       }
        wake_up(&port->wait);
        vds_vio_unlock(vio, flags);
        vds_io_free(io);
index cd4efd8f5a66d1589079a357d69a18ecda81cb67..4789297393071e629fd884344fcc21c967017258 100644 (file)
@@ -42,6 +42,7 @@ struct vds_io {
 #define        VDS_IO_CACHE            0x1
 #define        VDS_IO_INIT             0x2
 #define        VDS_IO_FINI             0x4
+#define        VDS_IO_DROP             0x8
 
 int vds_io_init(void);
 void vds_io_fini(void);
index a3453181b9516edd66df03ecf6e8d61e9a31f7af..d0655da88b35487e3fe0b5d406a2c500c8e32a60 100644 (file)
@@ -56,10 +56,15 @@ MODULE_VERSION(DRV_MOD_VERSION);
  * conceivably block when the backend is closed.  The serialization should
  * ensure that a following handshake initiates only after the reset is done.
  *
+ * Out of order execution bypasses vds_io_wait() except for FLUSH. This means
+ * that a request may start later but complete and respond to the client
+ * earlier than other requests.
+ *
  * The recommended value for the size of the kernel workqueue is 0;
  * it creates threads which scale with ncpu.
  */
 int vds_wq;
+int vds_ooo;   /* out of order execution default value */
 int vds_dbg;
 int vds_dbg_ldc;
 int vds_dbg_vio;
@@ -68,13 +73,20 @@ module_param(vds_dbg, uint, 0664);
 module_param(vds_dbg_ldc, uint, 0664);
 module_param(vds_dbg_vio, uint, 0664);
 module_param(vds_wq, uint, 0664);
+module_param(vds_ooo, uint, 0664);
 
 /* Ordered from largest major to lowest */
 static struct vio_version vds_versions[] = {
+       { .major = 1, .minor = 3 },
        { .major = 1, .minor = 1 },
        { .major = 1, .minor = 0 },
 };
 
+static inline int vds_version_supp(struct vds_port *port, u16 major, u16 minor)
+{
+       return port->vio.ver.major == major && port->vio.ver.minor >= minor;
+}
+
 static void vds_handshake_complete(struct vio_driver_state *vio)
 {
        struct vio_dring_state *dr;
@@ -148,10 +160,7 @@ static int vds_handle_attr(struct vio_driver_state *vio, void *arg)
                 * Set the maximum expected message length to
                 * accommodate in-band-descriptor messages with all
                 * their cookies.
-                */
-               vio->desc_buf_len = max_inband_msglen;
-
-               /*
+                *
                 * Reallocate before responding to the message since
                 * the next request in the handshake will use this size
                 * and a small msgbuf would make the ldc read fail.
@@ -360,7 +369,7 @@ static void vds_bh_hs(struct work_struct *work)
        if (io->flags & VDS_IO_INIT)
                err = vds_be_init(port);
 
-       vds_io_wait(io);
+       vds_io_wait(io);        /* handshake is always in order */
 
        if (!err)
                err = vio_control_pkt_engine(vio, port->msgbuf);
@@ -368,6 +377,9 @@ static void vds_bh_hs(struct work_struct *work)
        if (err)
                vdsmsg(err, "%s: handshake failed (%d)\n", port->path, err);
 
+       if (vds_version_supp(port, 1, 3))
+               vds_ooo = 1;
+
        vds_io_done(io);
 }
 
@@ -426,9 +438,12 @@ static void vds_bh_io(struct work_struct *work)
        if (io->ack == VIO_SUBTYPE_ACK && err != 0 && io->error == 0)
                io->error = err > 0 ? err : -err;
 
-       vds_io_wait(io);
+       if (!vds_ooo)
+               vds_io_wait(io);
 
-       if (port->xfer_mode == VIO_DRING_MODE)
+       if (io->flags & VDS_IO_DROP)
+               ;
+       else if (port->xfer_mode == VIO_DRING_MODE)
                (void) vds_dring_done(io);
        else if (port->xfer_mode == VIO_DESC_MODE)
                (void) vds_desc_done(io);
@@ -436,11 +451,10 @@ static void vds_bh_io(struct work_struct *work)
                BUG();
 
        /*
-        * If there was a reset then the IO request has been
-        * converted to a reset request queued to be executed.
+        * Any request, including one that was converted
+        * to a reset ends up here to be completed.
         */
-       if (!(io->flags & VDS_IO_FINI))
-               vds_io_done(io);
+       vds_io_done(io);
 }
 
 static void vds_reset(struct vds_io *io)
@@ -474,7 +488,6 @@ static void vds_reset(struct vds_io *io)
 
        vds_vio_lock(vio, flags);
        vio_link_state_change(vio, LDC_EVENT_RESET);
-       vio->desc_buf_len = 0;
 
        port->flags = 0;
        kfree(port->msgbuf);
@@ -500,10 +513,17 @@ static void vds_bh_reset(struct work_struct *work)
        struct vds_io *io = container_of(work, struct vds_io, vds_work);
        struct vio_driver_state *vio = io->vio;
 
-       vds_io_wait(io);
+       if (!vds_ooo)
+               vds_io_wait(io);
        vds_reset(io);
-       ldc_enable_hv_intr(vio->lp);
        vds_io_done(io);
+
+       /*
+        * Enable LDC interrupt after the request completion
+        * so that no new requests are queued while the IO
+        * queue is discarded during reset processing.
+        */
+       ldc_enable_hv_intr(vio->lp);
 }
 
 static int vds_dring_io(struct vio_driver_state *vio)
@@ -806,7 +826,7 @@ static void vds_event(void *arg, int event)
 
 static struct ldc_channel_config vds_ldc_cfg = {
        .event          = vds_event,
-       .mtu            = 64,
+       .mtu            = 256,
        .mode           = LDC_MODE_UNRELIABLE,
 };