]> www.infradead.org Git - users/jedix/linux-maple.git/commitdiff
sparc64: Use block layer BIO-based interface for VDC IO requests
authorBijan Mottahedeh <bijan.mottahedeh@oracle.com>
Mon, 10 Oct 2016 07:10:59 +0000 (03:10 -0400)
committerAllen Pais <allen.pais@oracle.com>
Tue, 8 Nov 2016 10:07:50 +0000 (15:37 +0530)
Orabug: 24823012

Signed-off-by: Bijan Mottahedeh <bijan.mottahedeh@oracle.com>
Reviewed-by: Alexandre Chartre <alexandre.chartre@oracle.com>
Signed-off-by: Allen Pais <allen.pais@oracle.com>
drivers/block/sunvdc.c

index b3f342165af84eb638efeb2a746a2665103d0eb2..d61b7ea943719fdfd8416e7306addffa8bd18b82 100644 (file)
@@ -18,6 +18,7 @@
 #include <linux/init.h>
 #include <linux/list.h>
 #include <linux/scatterlist.h>
+#include <linux/wait.h>
 
 #include <asm/vio.h>
 #include <asm/ldc.h>
@@ -44,7 +45,9 @@ MODULE_VERSION(DRV_MODULE_VERSION);
 static struct workqueue_struct *sunvdc_wq;
 
 struct vdc_req_entry {
-       struct request          *req;
+       struct bio              *req;
+       u64                     size;
+       int                     sent;
 };
 
 struct vdc_port {
@@ -52,13 +55,14 @@ struct vdc_port {
 
        struct gendisk          *disk;
 
-       struct vdc_completion   *cmp;
+       struct vio_completion   *cmp;
 
        u64                     req_id;
        u64                     seq;
        struct vdc_req_entry    rq_arr[VDC_TX_RING_SIZE];
 
        unsigned long           ring_cookies;
+       wait_queue_head_t       wait;
 
        u64                     max_xfer_size;
        u32                     vdisk_block_size;
@@ -75,13 +79,17 @@ struct vdc_port {
        u8                      vdisk_type;
        u8                      vdisk_mtype;
 
+       u8                      flags;
+
        char                    disk_name[32];
 };
 
+#define        VDC_PORT_RESET  0x1
+
 static void vdc_ldc_reset(struct vdc_port *port);
 static void vdc_ldc_reset_work(struct work_struct *work);
 static void vdc_ldc_reset_timer(unsigned long _arg);
-static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx);
+static struct bio *vdc_desc_put(struct vdc_port *port, unsigned int idx);
 static inline void vdc_desc_set_state(struct vio_disk_desc *, int);
 
 static inline struct vdc_port *to_vdc_port(struct vio_driver_state *vio)
@@ -107,11 +115,6 @@ static inline int vdc_version_supported(struct vdc_port *port,
 static int vdc_major;
 #define PARTITION_SHIFT        3
 
-static inline u32 vdc_tx_dring_avail(struct vio_dring_state *dr)
-{
-       return VDC_TX_RING_SIZE - bitmap_weight(dr->txmap, dr->nr_txmap);
-}
-
 static int vdc_getgeo(struct block_device *bdev, struct hd_geometry *geo)
 {
        struct gendisk *disk = bdev->bd_disk;
@@ -165,30 +168,13 @@ static const struct block_device_operations vdc_fops = {
        .ioctl          = vdc_ioctl,
 };
 
-static void vdc_blk_queue_start(struct vdc_port *port)
-{
-       struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
-
-       /* restart blk queue when ring is half emptied. also called after
-        * handshake completes, so check for initial handshake before we've
-        * allocated a disk.
-        */
-       if (port->disk && blk_queue_stopped(port->disk->queue) &&
-           vdc_tx_dring_avail(dr) * 100 / VDC_TX_RING_SIZE >= 50) {
-               blk_start_queue(port->disk->queue);
-       }
-
-}
-
-static void vdc_finish(struct vio_driver_state *vio, int err, int waiting_for)
+static void vdc_finish(struct vio_completion *cmp, int err, int waiting_for)
 {
-       if (vio->cmp &&
-           (waiting_for == -1 ||
-            vio->cmp->waiting_for == waiting_for)) {
-               vio->cmp->err = err;
-               complete(&vio->cmp->com);
-               vio->cmp = NULL;
-       }
+       if (cmp && (waiting_for == -1 || cmp->waiting_for == waiting_for)) {
+               cmp->err = err;
+               complete_all(&cmp->com);
+       } else
+               pr_debug(PFX "skip err=%d wait=%d\n", err, waiting_for);
 }
 
 static void vdc_handshake_complete(struct vio_driver_state *vio)
@@ -196,8 +182,8 @@ static void vdc_handshake_complete(struct vio_driver_state *vio)
        struct vdc_port *port = to_vdc_port(vio);
 
        del_timer(&port->ldc_reset_timer);
-       vdc_finish(vio, 0, WAITING_FOR_LINK_UP);
-       vdc_blk_queue_start(port);
+       vdc_finish(vio->cmp, 0, WAITING_FOR_LINK_UP);
+       vio->cmp = NULL;
 }
 
 static int vdc_handle_unknown(struct vdc_port *port, void *arg)
@@ -286,7 +272,8 @@ static int vdc_handle_attr(struct vio_driver_state *vio, void *arg)
 
 static void vdc_end_special(struct vdc_port *port, int err)
 {
-       vdc_finish(&port->vio, -err, WAITING_FOR_GEN_CMD);
+       vdc_finish(port->cmp, -err, WAITING_FOR_GEN_CMD);
+       port->cmp = NULL;
 }
 
 static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr,
@@ -294,7 +281,8 @@ static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr,
 {
        struct vio_disk_desc *desc = vio_dring_entry(dr, index);
        struct vio_driver_state *vio = &port->vio;
-       struct request *req;
+       struct vdc_req_entry *rqe = &port->rq_arr[index];
+       struct bio *req;
 
        assert_spin_locked(&vio->lock);
 
@@ -313,9 +301,15 @@ static void vdc_end_one(struct vdc_port *port, struct vio_dring_state *dr,
                return;
        }
 
-       __blk_end_request(req, (desc->status ? -EIO : 0), desc->size);
+       if (rqe->size != desc->size) {
+               pr_err("%s idx=%u err=%d state=%d size=%lld rsize=%lld\n",
+                       __func__, index, err, desc->hdr.state,
+                       desc->size, rqe->size);
+               BUG();
+       }
 
-       vdc_blk_queue_start(port);
+       bio_endio(req, err ? -EIO : 0);
+       rqe->size = 0;
 }
 
 static int vdc_ack(struct vdc_port *port, void *msgbuf)
@@ -403,14 +397,22 @@ static void vdc_event(void *arg, int event)
                if (err < 0)
                        break;
        }
-       if (err < 0)
-               vdc_finish(&port->vio, err, WAITING_FOR_ANY);
+
+       /*
+        * If there is an error, reset the link.  All inflight
+        * requests will be resent once the port is up again.
+        */
+       if (err < 0) {
+               vio_link_state_change(vio, LDC_EVENT_RESET);
+               queue_work(sunvdc_wq, &port->ldc_reset_work);
+       }
 out:
        spin_unlock_irqrestore(&vio->lock, flags);
 }
 
-static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx)
+static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx, int new)
 {
+       struct vdc_req_entry *rqe = &port->rq_arr[idx];
        struct vio_driver_state *vio = &port->vio;
        struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
        struct vio_dring_data hdr = {
@@ -424,17 +426,38 @@ static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx)
                .start_idx              = idx,
                .end_idx                = idx,
        };
+       unsigned long flags = 0;
        int err, delay;
 
        delay = 1;
        do {
+               /*
+                * We can get here for a new or an inflight request.
+                * In the former case, we must explicity hold the vio lock.
+                * In the latter case, the lock is already held as part
+                * reset processing.  Sending the request and setting the
+                * related fields in case of success must be atomic.
+                * First, snd_nxt and req_id must be incremented atomically.
+                * Second, if the thread is preempted after vio_ldc_send()
+                * but before setting rqe->sent and the service resets,
+                * the request may not be resent during reset processing.
+                */
+               if (new)
+                       spin_lock_irqsave(&vio->lock, flags);
+               else
+                       assert_spin_locked(&vio->lock);
                hdr.seq = dr->snd_nxt;
                err = vio_ldc_send(vio, &hdr, sizeof(hdr));
                if (err > 0) {
+                       rqe->sent = 1;
                        dr->snd_nxt++;
                        port->req_id++;
+                       if (new)
+                               spin_unlock_irqrestore(&vio->lock, flags);
                        break;
                }
+               if (new)
+                       spin_unlock_irqrestore(&vio->lock, flags);
 
                udelay(delay);
                if ((delay <<= 1) > 128)
@@ -448,37 +471,51 @@ static int __vdc_tx_trigger(struct vdc_port *port, unsigned int idx)
 }
 
 static struct vio_disk_desc *vdc_desc_get(struct vdc_port *port,
-                                         struct request *req,
+                                         struct bio *req,
                                          unsigned int *idxp)
 {
        unsigned int idx;
        struct vio_disk_desc *desc = NULL;
        struct vio_driver_state *vio = &port->vio;
        struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+       DEFINE_WAIT(wait);
+       unsigned long flags;
 
-       assert_spin_locked(&vio->lock);
-
-       idx = find_first_zero_bit(dr->txmap, dr->nr_txmap);
-       if (idx < VDC_TX_RING_SIZE) {
-               bitmap_set(dr->txmap, idx, 1);
-               desc = dr->base + (dr->entry_size * idx);
-               if (req) {
-                       BUG_ON(port->rq_arr[idx].req);
-                       port->rq_arr[idx].req = req;
+       while (1) {
+               prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE);
+
+               spin_lock_irqsave(&vio->lock, flags);
+               idx = find_first_zero_bit(dr->txmap, dr->nr_txmap);
+               if (idx < VDC_TX_RING_SIZE) {
+                       bitmap_set(dr->txmap, idx, 1);
+                       desc = dr->base + (dr->entry_size * idx);
+                       if (req) {
+                               BUG_ON(port->rq_arr[idx].req);
+                               port->rq_arr[idx].req = req;
+                       }
+                       *idxp = idx;
+
+                       spin_unlock_irqrestore(&vio->lock, flags);
+                       finish_wait(&port->wait, &wait);
+                       break;
                }
-               *idxp = idx;
+               spin_unlock_irqrestore(&vio->lock, flags);
+               schedule();
+               finish_wait(&port->wait, &wait);
        }
 
+       BUG_ON(!desc);
+
        return desc;
 }
 
-static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx)
+static struct bio *vdc_desc_put(struct vdc_port *port, unsigned int idx)
 {
        struct vio_driver_state *vio = &port->vio;
        struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
        struct vio_disk_desc *desc = vio_dring_entry(dr, idx);
        struct vdc_req_entry *rqe;
-       struct request *req;
+       struct bio *req;
 
        assert_spin_locked(&vio->lock);
 
@@ -490,6 +527,8 @@ static struct request *vdc_desc_put(struct vdc_port *port, unsigned int idx)
        rqe = &port->rq_arr[idx];
        req = rqe->req;
        rqe->req = NULL;
+       rqe->sent = 0;
+       wake_up_all(&port->wait);
 
        return req;
 }
@@ -514,10 +553,14 @@ static void __create_flush_desc(struct vdc_port *port,
 }
 
 static int __create_rw_desc(struct vdc_port *port, struct request *req,
-                           struct vio_disk_desc *desc)
+                           struct vio_disk_desc *desc, unsigned int idx)
 {
+       struct vio_driver_state *vio = &port->vio;
        struct scatterlist sg[port->ring_cookies];
+       struct vdc_req_entry *rqe;
+       DEFINE_WAIT(wait);
        unsigned int map_perm;
+       unsigned long flags;
        int nsg, err, i;
        u64 len;
        u8 op;
@@ -541,8 +584,23 @@ static int __create_rw_desc(struct vdc_port *port, struct request *req,
 
        memset(desc, 0, sizeof(struct vio_disk_desc));
 
-       err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies,
-                        port->ring_cookies, map_perm);
+       while (1) {
+               prepare_to_wait(&port->wait, &wait, TASK_INTERRUPTIBLE);
+
+               spin_lock_irqsave(&vio->lock, flags);
+               err = ldc_map_sg(port->vio.lp, sg, nsg, desc->cookies,
+                                port->ring_cookies, map_perm);
+
+               if (err >= 0 || err != -ENOMEM) {
+                       spin_unlock_irqrestore(&vio->lock, flags);
+                       finish_wait(&port->wait, &wait);
+                       break;
+               }
+
+               spin_unlock_irqrestore(&vio->lock, flags);
+               schedule();
+               finish_wait(&port->wait, &wait);
+       }
 
        if (err <= 0) {
                pr_err(PFX "ldc_map_sg() failed, err=%d.\n", err);
@@ -565,6 +623,9 @@ static int __create_rw_desc(struct vdc_port *port, struct request *req,
        desc->size = len;
        desc->ncookies = err;
 
+       rqe = &port->rq_arr[idx];
+       rqe->size = len;
+
        return 0;
 }
 
@@ -578,7 +639,7 @@ static int __send_request(struct vdc_port *port, unsigned int idx)
        vdc_desc_set_state(desc, VIO_DESC_READY);
 
        while (1) {
-               err = __vdc_tx_trigger(port, idx);
+               err = __vdc_tx_trigger(port, idx, 1);
 
                if (err == -ECONNRESET || err == -ENOTCONN) {
                        vdc_ldc_reset(port);
@@ -594,62 +655,75 @@ static int __send_request(struct vdc_port *port, unsigned int idx)
        return err;
 }
 
-static void do_vdc_request(struct request_queue *rq)
+/*
+ * IO requests are handed off directly to vdc without any queueing at
+ * the blk layer.
+ *
+ * A dring entry is first allocated and then mapped for a request,
+ * and the request is then sent to the server. A recoverable error,
+ * e.g. reset, is continuously retried until success.
+ *
+ * A reset error can be sync or async, i.e., it can be a direct result
+ * of a send, or fielded indirectly via an interrupt.  In either case
+ * the connection is reset and all inflight request are resent immediately
+ * after the connection is re-established at the end of the reset processing.
+ *
+ * The tx dring acts effectively as a request queue. An incoming request
+ * will continuously retry until it gets a tx ring descriptor and proceeds.
+ * The descriptor is freed in interrupt context and the vio spinlock is used
+ * for synchronization.
+ */
+static void vdc_make_request(struct request_queue *q, struct bio *bio)
 {
-       struct vdc_port *port = NULL;
-       struct vio_dring_state *dr = NULL;
-       struct request *req;
-       unsigned int idx = 0;
+       struct vdc_port *port = q->queuedata;
+       struct vio_driver_state *vio = &port->vio;
+       struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
+       struct vio_disk_desc *desc;
+       struct request req;
+       unsigned int idx;
+       unsigned long flags;
        int err = 0;
 
-       while ((req = blk_peek_request(rq)) != NULL) {
-               struct vio_disk_desc *desc;
-
-               port = req->rq_disk->private_data;
-               dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+       BUG_ON(!port);
 
-               if (unlikely(vdc_tx_dring_avail(dr) < 1))
-                       goto wait;
+       desc = vdc_desc_get(port, bio, &idx);
 
-               desc = vdc_desc_get(port, req, &idx);
-
-               /* Note that REQ_FLUSH requests with a payload are
-                * automatically turned into a sequence of an empty
-                * REQ_FLUSH request followed by the actual i/o by
-                * the block layer.
+       if (bio->bi_rw & REQ_FLUSH)
+               __create_flush_desc(port, desc);
+       else {
+               memset(&req, 0, sizeof(req));
+               /*
+                * XXX We should be able to use init_request_from_bio()
+                * but it is not an exported symbol.
                 */
-               if (req->cmd_flags & REQ_FLUSH) {
-                       if (req->bio != NULL)
-                               pr_err(PFX "non-empty REQ_FLUSH, lost I/O\n");
-                       __create_flush_desc(port, desc);
-               } else
-                       err = __create_rw_desc(port, req, desc);
-
+               req.bio = bio;
+               req.cmd_flags = bio->bi_rw & REQ_COMMON_MASK;
+               req.errors = 0;
+               req.__sector = bio->bi_iter.bi_sector;
+               req.q = q;
+               err = __create_rw_desc(port, &req, desc, idx);
                if (err)
-                       goto wait;
-
-               blk_start_request(req);
+                       pr_err(PFX "__create_rw_desc() failed, err=%d\n", err);
+       }
 
+       if (!err)
                err = __send_request(port, idx);
-               if (err < 0) {
-                       blk_requeue_request(rq, req);
-wait:
-                       /* Avoid pointless unplugs. */
-                       blk_stop_queue(rq);
-                       break;
-               }
-       }
 
-       if (err < 0 &&
-           err != -ENOMEM && err != -ECONNRESET && err != -ENOTCONN)
+       /*
+        * Terminate the request unless a descriptor was
+        * successfully allocated and sent to the server.
+        */
+       if (err < 0) {
+               spin_lock_irqsave(&vio->lock, flags);
                vdc_end_one(port, dr, idx, err);
+               spin_unlock_irqrestore(&vio->lock, flags);
+       }
 }
 
 static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
 {
        struct vio_completion comp;
        struct vio_disk_desc *desc;
-       unsigned long flags;
        unsigned int map_perm;
        unsigned int idx;
        int op_len, err;
@@ -712,7 +786,6 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
        case VD_OP_GET_EFI:
        case VD_OP_SET_EFI:
                return -EOPNOTSUPP;
-               break;
        };
 
        map_perm |= LDC_MAP_SHADOW | LDC_MAP_DIRECT | LDC_MAP_IO;
@@ -728,8 +801,6 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
        if (map_perm & LDC_MAP_R)
                memcpy(req_buf, buf, len);
 
-       spin_lock_irqsave(&port->vio.lock, flags);
-
        desc = vdc_desc_get(port, NULL, &idx);
        if (!desc) {
                err = -ENOMEM;
@@ -744,7 +815,7 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
 
        init_completion(&comp.com);
        comp.waiting_for = WAITING_FOR_GEN_CMD;
-       port->vio.cmp = &comp;
+       port->cmp = &comp;
 
        desc->hdr.ack = VIO_ACK_ENABLE;
        desc->req_id = port->req_id;
@@ -757,12 +828,10 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
 
        err = __send_request(port, idx);
        if (err >= 0) {
-               spin_unlock_irqrestore(&port->vio.lock, flags);
                wait_for_completion(&comp.com);
                err = comp.err;
-               spin_lock_irqsave(&port->vio.lock, flags);
        } else {
-               port->vio.cmp = NULL;
+               port->cmp = NULL;
                goto done;
        }
 
@@ -771,9 +840,7 @@ static int generic_request(struct vdc_port *port, u8 op, void *buf, int len)
 
 done:
        (void) vdc_desc_put(port, idx);
-       spin_unlock_irqrestore(&port->vio.lock, flags);
        kfree(req_buf);
-
        return err;
 }
 
@@ -886,12 +953,15 @@ static int probe_disk(struct vdc_port *port)
                                    (u64)geom.num_sec);
        }
 
-       q = blk_init_queue(do_vdc_request, &port->vio.lock);
+       q = blk_alloc_queue(GFP_KERNEL);
        if (!q) {
                printk(KERN_ERR PFX "%s: Could not allocate queue.\n",
                       port->vio.name);
                return -ENOMEM;
        }
+       blk_queue_make_request(q, vdc_make_request);
+       q->queuedata = port;
+
        g = alloc_disk(1 << PARTITION_SHIFT);
        if (!g) {
                printk(KERN_ERR PFX "%s: Could not allocate gendisk.\n",
@@ -906,14 +976,6 @@ static int probe_disk(struct vdc_port *port)
        blk_queue_segment_boundary(q, PAGE_SIZE - 1);
        blk_queue_max_segment_size(q, PAGE_SIZE);
 
-       /* vds may be a device with volatile caching and in protocol 1.3
-        * can perform out of order completion. REQ_FLUSH/REQ_FUA are
-        * used to signal completion barriers.
-        * REQ_FUA is turned into a following REQ_FLUSH by block layer
-        * if not supported directly.
-        */
-       blk_queue_flush(q, REQ_FLUSH);
-
        blk_queue_max_segments(q, port->ring_cookies);
        blk_queue_max_hw_sectors(q, port->max_xfer_size);
        g->major = vdc_major;
@@ -961,7 +1023,7 @@ static int probe_disk(struct vdc_port *port)
 
 static struct ldc_channel_config vdc_ldc_cfg = {
        .event          = vdc_event,
-       .mtu            = 256,
+       .mtu            = 512,
        .mode           = LDC_MODE_UNRELIABLE,
 };
 
@@ -1031,6 +1093,7 @@ static int vdc_port_probe(struct vio_dev *vdev, const struct vio_device_id *id)
        setup_timer(&port->ldc_reset_timer, vdc_ldc_reset_timer,
                    (unsigned long)port);
        INIT_WORK(&port->ldc_reset_work, vdc_ldc_reset_work);
+       init_waitqueue_head(&port->wait);
 
        err = vio_driver_init(&port->vio, vdev, VDEV_DISK,
                              vdc_versions, ARRAY_SIZE(vdc_versions),
@@ -1085,7 +1148,6 @@ static int vdc_port_remove(struct vio_dev *vdev)
                unsigned long flags;
 
                spin_lock_irqsave(&port->vio.lock, flags);
-               blk_stop_queue(port->disk->queue);
                spin_unlock_irqrestore(&port->vio.lock, flags);
 
                flush_work(&port->ldc_reset_work);
@@ -1093,7 +1155,6 @@ static int vdc_port_remove(struct vio_dev *vdev)
                del_timer_sync(&port->vio.timer);
 
                del_gendisk(port->disk);
-               blk_cleanup_queue(port->disk->queue);
                put_disk(port->disk);
                port->disk = NULL;
 
@@ -1107,12 +1168,12 @@ static int vdc_port_remove(struct vio_dev *vdev)
        return 0;
 }
 
-static void vdc_requeue_inflight(struct vdc_port *port)
+static void vdc_resend_inflight(struct vdc_port *port)
 {
        struct vio_driver_state *vio = &port->vio;
        struct vio_dring_state *dr = &vio->drings[VIO_DRIVER_TX_RING];
        struct vio_disk_desc *desc;
-       struct request *req;
+       struct vdc_req_entry *rqe;
        unsigned int idx;
 
        assert_spin_locked(&vio->lock);
@@ -1120,22 +1181,26 @@ static void vdc_requeue_inflight(struct vdc_port *port)
        for (idx = find_first_bit(dr->txmap, dr->nr_txmap);
             idx < dr->nr_txmap;
             idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) {
-               req = vdc_desc_put(port, idx);
-               if (req == NULL) {
+               rqe = &port->rq_arr[idx];
+               if (rqe->sent) {
                        desc = vio_dring_entry(dr, idx);
-                       vdc_end_special(port, desc->status);
-                       continue;
+                       vdc_desc_set_state(desc, VIO_DESC_READY);
+                       if (__vdc_tx_trigger(port, idx, 0) < 0)
+                               break;
                }
-               blk_requeue_request(port->disk->queue, req);
        }
 }
 
 static void vdc_queue_drain(struct vdc_port *port)
 {
-       struct request *req;
+       struct vio_dring_state *dr = &port->vio.drings[VIO_DRIVER_TX_RING];
+       unsigned int idx;
 
-       while ((req = blk_fetch_request(port->disk->queue)) != NULL)
-               __blk_end_request_all(req, -EIO);
+       for (idx = find_first_bit(dr->txmap, dr->nr_txmap);
+            idx < dr->nr_txmap;
+            idx = find_next_bit(dr->txmap, dr->nr_txmap, idx + 1)) {
+                       vdc_end_one(port, dr, idx, -EIO);
+       }
 }
 
 static void vdc_ldc_reset_timer(unsigned long _arg)
@@ -1152,7 +1217,6 @@ static void vdc_ldc_reset_timer(unsigned long _arg)
                pr_warn(PFX "%s ldc down %llu seconds, draining queue\n",
                        port->disk_name, port->ldc_timeout);
                vdc_queue_drain(port);
-               vdc_blk_queue_start(port);
        }
        spin_unlock_irqrestore(&vio->lock, flags);
 }
@@ -1160,15 +1224,10 @@ static void vdc_ldc_reset_timer(unsigned long _arg)
 static void vdc_ldc_reset_work(struct work_struct *work)
 {
        struct vdc_port *port;
-       struct vio_driver_state *vio;
-       unsigned long flags;
 
        port = container_of(work, struct vdc_port, ldc_reset_work);
-       vio = &port->vio;
 
-       spin_lock_irqsave(&vio->lock, flags);
        vdc_ldc_reset(port);
-       spin_unlock_irqrestore(&vio->lock, flags);
 }
 
 /*
@@ -1179,29 +1238,42 @@ static void vdc_ldc_reset_work(struct work_struct *work)
  */
 static void vdc_ldc_reset(struct vdc_port *port)
 {
-       int err;
        struct vio_driver_state *vio = &port->vio;
-
-       assert_spin_locked(&vio->lock);
+       unsigned long flags;
+       int err;
 
        pr_warn(PFX "%s ldc link reset\n", port->disk_name);
 
-       if (!port->disk)
+       spin_lock_irqsave(&vio->lock, flags);
+
+       if (port->flags & VDC_PORT_RESET) {
+               spin_unlock_irqrestore(&vio->lock, flags);
+               wait_for_completion(&port->vio.cmp->com);
                return;
+       }
 
-       blk_stop_queue(port->disk->queue);
-       vdc_requeue_inflight(port);
-       vio_link_state_change(vio, LDC_EVENT_RESET);
+       if (!port->disk)
+               goto done;
 
-       err = ldc_connect(vio->lp);
+       vio_link_state_change(vio, LDC_EVENT_RESET);
+       port->flags |= VDC_PORT_RESET;
+       spin_unlock_irqrestore(&vio->lock, flags);
+       err = vdc_port_up(port);
+       spin_lock_irqsave(&vio->lock, flags);
        if (err)
-               pr_err(PFX "%s connect failed, err=%d\n",
-                       port->disk_name, err);
+               pr_err(PFX "%s vdc_port_up() failed, err=%d\n",
+                      port->disk_name, err);
+       else
+               vdc_resend_inflight(port);
 
        if (port->ldc_timeout)
                mod_timer(&port->ldc_reset_timer,
                          round_jiffies(jiffies + HZ * port->ldc_timeout));
        mod_timer(&vio->timer, round_jiffies(jiffies + HZ));
+
+       port->flags &= ~VDC_PORT_RESET;
+done:
+       spin_unlock_irqrestore(&vio->lock, flags);
 }
 
 static const struct vio_device_id vdc_port_match[] = {