*/
 retry:
        ret = pblk_rb_may_write_user(&pblk->rwb, bio, nr_entries, &bpos);
-       if (ret == NVM_IO_REQUEUE) {
+       switch (ret) {
+       case NVM_IO_REQUEUE:
                io_schedule();
                goto retry;
+       case NVM_IO_ERR:
+               pblk_pipeline_stop(pblk);
+               goto out;
        }
 
        if (unlikely(!bio_has_data(bio)))
        atomic_long_add(nr_entries, &pblk->req_writes);
 #endif
 
+       pblk_rl_inserted(&pblk->rl, nr_entries);
+
 out:
        pblk_write_should_kick(pblk);
        return ret;
 
                *ppa = rqd->ppa_addr;
                pblk_mark_bb(pblk, line, ppa);
        }
+
+       atomic_dec(&pblk->inflight_io);
 }
 
 /* Erase completion assumes that only one block is erased at the time */
        complete(waiting);
 }
 
-void pblk_flush_writer(struct pblk *pblk)
+void pblk_wait_for_meta(struct pblk *pblk)
 {
-       struct bio *bio;
-       int ret;
-       DECLARE_COMPLETION_ONSTACK(wait);
-
-       bio = bio_alloc(GFP_KERNEL, 1);
-       if (!bio)
-               return;
-
-       bio->bi_iter.bi_sector = 0; /* internal bio */
-       bio_set_op_attrs(bio, REQ_OP_WRITE, REQ_OP_FLUSH);
-       bio->bi_private = &wait;
-       bio->bi_end_io = pblk_end_bio_sync;
+       do {
+               if (!atomic_read(&pblk->inflight_io))
+                       break;
 
-       ret = pblk_write_to_cache(pblk, bio, 0);
-       if (ret == NVM_IO_OK) {
-               if (!wait_for_completion_io_timeout(&wait,
-                               msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
-                       pr_err("pblk: flush cache timed out\n");
-               }
-       } else if (ret != NVM_IO_DONE) {
-               pr_err("pblk: tear down bio failed\n");
-       }
+               schedule();
+       } while (1);
+}
 
-       if (bio->bi_status)
-               pr_err("pblk: flush sync write failed (%u)\n", bio->bi_status);
+static void pblk_flush_writer(struct pblk *pblk)
+{
+       pblk_rb_flush(&pblk->rwb);
+       do {
+               if (!pblk_rb_read_count(&pblk->rwb))
+                       break;
 
-       bio_put(bio);
+               schedule();
+       } while (1);
 }
 
 struct list_head *pblk_line_gc_list(struct pblk *pblk, struct pblk_line *line)
                }
        }
 #endif
+
+       atomic_inc(&pblk->inflight_io);
+
        return nvm_submit_io(dev, rqd);
 }
 
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: emeta I/O timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
        reinit_completion(&wait);
 
        if (likely(pblk->l_mg.emeta_alloc_type == PBLK_VMALLOC_META))
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: smeta I/O timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
 
        if (rqd.error) {
                if (dir == WRITE)
 static int pblk_blk_erase_sync(struct pblk *pblk, struct ppa_addr ppa)
 {
        struct nvm_rq rqd;
-       int ret;
+       int ret = 0;
        DECLARE_COMPLETION_ONSTACK(wait);
 
        memset(&rqd, 0, sizeof(struct nvm_rq));
        rqd.private = pblk;
        __pblk_end_io_erase(pblk, &rqd);
 
-       return 0;
+       return ret;
 }
 
 int pblk_line_erase(struct pblk *pblk, struct pblk_line *line)
 {
        struct pblk_line_meta *lm = &pblk->lm;
        struct ppa_addr ppa;
-       int bit = -1;
+       int ret, bit = -1;
 
        /* Erase only good blocks, one at a time */
        do {
                WARN_ON(test_and_set_bit(bit, line->erase_bitmap));
                spin_unlock(&line->lock);
 
-               if (pblk_blk_erase_sync(pblk, ppa)) {
+               ret = pblk_blk_erase_sync(pblk, ppa);
+               if (ret) {
                        pr_err("pblk: failed to erase line %d\n", line->id);
-                       return -ENOMEM;
+                       return ret;
                }
        } while (1);
 
 {
        int meta_line;
 
+       lockdep_assert_held(&l_mg->free_lock);
+
 retry_meta:
        meta_line = find_first_zero_bit(&l_mg->meta_bitmap, PBLK_DATA_LINES);
        if (meta_line == PBLK_DATA_LINES) {
        /* Mark smeta metadata sectors as bad sectors */
        bit = find_first_zero_bit(line->blk_bitmap, lm->blk_per_line);
        off = bit * geo->sec_per_pl;
-retry_smeta:
        bitmap_set(line->map_bitmap, off, lm->smeta_sec);
        line->sec_in_line -= lm->smeta_sec;
        line->smeta_ssec = off;
 
        if (init && pblk_line_submit_smeta_io(pblk, line, off, WRITE)) {
                pr_debug("pblk: line smeta I/O failed. Retry\n");
-               off += geo->sec_per_pl;
-               goto retry_smeta;
+               return 1;
        }
 
        bitmap_copy(line->invalid_bitmap, line->map_bitmap, lm->sec_per_line);
 
        spin_lock(&line->lock);
        if (line->state != PBLK_LINESTATE_FREE) {
+               mempool_free(line->invalid_bitmap, pblk->line_meta_pool);
+               mempool_free(line->map_bitmap, pblk->line_meta_pool);
                spin_unlock(&line->lock);
-               WARN(1, "pblk: corrupted line state\n");
-               return -EINTR;
+               WARN(1, "pblk: corrupted line %d, state %d\n",
+                                                       line->id, line->state);
+               return -EAGAIN;
        }
+
        line->state = PBLK_LINESTATE_OPEN;
 
        atomic_set(&line->left_eblks, blk_in_line);
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line_meta *lm = &pblk->lm;
-       struct pblk_line *line = NULL;
-       int bit;
+       struct pblk_line *line;
+       int ret, bit;
 
        lockdep_assert_held(&l_mg->free_lock);
 
-retry_get:
+retry:
        if (list_empty(&l_mg->free_list)) {
                pr_err("pblk: no free lines\n");
-               goto out;
+               return NULL;
        }
 
        line = list_first_entry(&l_mg->free_list, struct pblk_line, list);
                list_add_tail(&line->list, &l_mg->bad_list);
 
                pr_debug("pblk: line %d is bad\n", line->id);
-               goto retry_get;
+               goto retry;
        }
 
-       if (pblk_line_prepare(pblk, line)) {
-               pr_err("pblk: failed to prepare line %d\n", line->id);
-               list_add(&line->list, &l_mg->free_list);
-               l_mg->nr_free_lines++;
-               return NULL;
+       ret = pblk_line_prepare(pblk, line);
+       if (ret) {
+               if (ret == -EAGAIN) {
+                       list_add(&line->list, &l_mg->corrupt_list);
+                       goto retry;
+               } else {
+                       pr_err("pblk: failed to prepare line %d\n", line->id);
+                       list_add(&line->list, &l_mg->free_list);
+                       l_mg->nr_free_lines++;
+                       return NULL;
+               }
        }
 
-out:
        return line;
 }
 
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line *retry_line;
 
+retry:
        spin_lock(&l_mg->free_lock);
        retry_line = pblk_line_get(pblk);
        if (!retry_line) {
        l_mg->data_line = retry_line;
        spin_unlock(&l_mg->free_lock);
 
-       if (pblk_line_erase(pblk, retry_line)) {
-               spin_lock(&l_mg->free_lock);
-               l_mg->data_line = NULL;
-               spin_unlock(&l_mg->free_lock);
-               return NULL;
-       }
-
        pblk_rl_free_lines_dec(&pblk->rl, retry_line);
 
+       if (pblk_line_erase(pblk, retry_line))
+               goto retry;
+
        return retry_line;
 }
 
+static void pblk_set_space_limit(struct pblk *pblk)
+{
+       struct pblk_rl *rl = &pblk->rl;
+
+       atomic_set(&rl->rb_space, 0);
+}
+
 struct pblk_line *pblk_line_get_first_data(struct pblk *pblk)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
 
        /* Allocate next line for preparation */
        l_mg->data_next = pblk_line_get(pblk);
-       if (l_mg->data_next) {
+       if (!l_mg->data_next) {
+               /* If we cannot get a new line, we need to stop the pipeline.
+                * Only allow as many writes in as we can store safely and then
+                * fail gracefully
+                */
+               pblk_set_space_limit(pblk);
+
+               l_mg->data_next = NULL;
+       } else {
                l_mg->data_next->seq_nr = l_mg->d_seq_nr++;
                l_mg->data_next->type = PBLK_LINETYPE_DATA;
                is_next = 1;
        }
        spin_unlock(&l_mg->free_lock);
 
+       if (pblk_line_erase(pblk, line)) {
+               line = pblk_line_retry(pblk, line);
+               if (!line)
+                       return NULL;
+       }
+
        pblk_rl_free_lines_dec(&pblk->rl, line);
        if (is_next)
                pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
 
-       if (pblk_line_erase(pblk, line))
-               return NULL;
-
 retry_setup:
        if (!pblk_line_init_metadata(pblk, line, NULL)) {
                line = pblk_line_retry(pblk, line);
        return line;
 }
 
-struct pblk_line *pblk_line_replace_data(struct pblk *pblk)
+static void pblk_stop_writes(struct pblk *pblk, struct pblk_line *line)
+{
+       lockdep_assert_held(&pblk->l_mg.free_lock);
+
+       pblk_set_space_limit(pblk);
+       pblk->state = PBLK_STATE_STOPPING;
+}
+
+void pblk_pipeline_stop(struct pblk *pblk)
+{
+       struct pblk_line_mgmt *l_mg = &pblk->l_mg;
+       int ret;
+
+       spin_lock(&l_mg->free_lock);
+       if (pblk->state == PBLK_STATE_RECOVERING ||
+                                       pblk->state == PBLK_STATE_STOPPED) {
+               spin_unlock(&l_mg->free_lock);
+               return;
+       }
+       pblk->state = PBLK_STATE_RECOVERING;
+       spin_unlock(&l_mg->free_lock);
+
+       pblk_flush_writer(pblk);
+       pblk_wait_for_meta(pblk);
+
+       ret = pblk_recov_pad(pblk);
+       if (ret) {
+               pr_err("pblk: could not close data on teardown(%d)\n", ret);
+               return;
+       }
+
+       pblk_line_close_meta_sync(pblk);
+
+       spin_lock(&l_mg->free_lock);
+       pblk->state = PBLK_STATE_STOPPED;
+       l_mg->data_line = NULL;
+       l_mg->data_next = NULL;
+       spin_unlock(&l_mg->free_lock);
+}
+
+void pblk_line_replace_data(struct pblk *pblk)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line *cur, *new;
        cur = l_mg->data_line;
        new = l_mg->data_next;
        if (!new)
-               return NULL;
+               return;
        l_mg->data_line = new;
 
-retry_line:
+       spin_lock(&l_mg->free_lock);
+       if (pblk->state != PBLK_STATE_RUNNING) {
+               l_mg->data_line = NULL;
+               l_mg->data_next = NULL;
+               spin_unlock(&l_mg->free_lock);
+               return;
+       }
+
+       pblk_line_setup_metadata(new, l_mg, &pblk->lm);
+       spin_unlock(&l_mg->free_lock);
+
+retry_erase:
        left_seblks = atomic_read(&new->left_seblks);
        if (left_seblks) {
                /* If line is not fully erased, erase it */
                if (atomic_read(&new->left_eblks)) {
                        if (pblk_line_erase(pblk, new))
-                               return NULL;
+                               return;
                } else {
                        io_schedule();
                }
-               goto retry_line;
+               goto retry_erase;
        }
 
-       spin_lock(&l_mg->free_lock);
-       /* Allocate next line for preparation */
-       l_mg->data_next = pblk_line_get(pblk);
-       if (l_mg->data_next) {
-               l_mg->data_next->seq_nr = l_mg->d_seq_nr++;
-               l_mg->data_next->type = PBLK_LINETYPE_DATA;
-               is_next = 1;
-       }
-
-       pblk_line_setup_metadata(new, l_mg, &pblk->lm);
-       spin_unlock(&l_mg->free_lock);
-
-       if (is_next)
-               pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
-
 retry_setup:
        if (!pblk_line_init_metadata(pblk, new, cur)) {
                new = pblk_line_retry(pblk, new);
                if (!new)
-                       return NULL;
+                       return;
 
                goto retry_setup;
        }
        if (!pblk_line_init_bb(pblk, new, 1)) {
                new = pblk_line_retry(pblk, new);
                if (!new)
-                       return NULL;
+                       return;
 
                goto retry_setup;
        }
 
-       return new;
+       /* Allocate next line for preparation */
+       spin_lock(&l_mg->free_lock);
+       l_mg->data_next = pblk_line_get(pblk);
+       if (!l_mg->data_next) {
+               /* If we cannot get a new line, we need to stop the pipeline.
+                * Only allow as many writes in as we can store safely and then
+                * fail gracefully
+                */
+               pblk_stop_writes(pblk, new);
+               l_mg->data_next = NULL;
+       } else {
+               l_mg->data_next->seq_nr = l_mg->d_seq_nr++;
+               l_mg->data_next->type = PBLK_LINETYPE_DATA;
+               is_next = 1;
+       }
+       spin_unlock(&l_mg->free_lock);
+
+       if (is_next)
+               pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
 }
 
 void pblk_line_free(struct pblk *pblk, struct pblk_line *line)
        return (line->left_msecs == 0);
 }
 
+void pblk_line_close_meta_sync(struct pblk *pblk)
+{
+       struct pblk_line_mgmt *l_mg = &pblk->l_mg;
+       struct pblk_line_meta *lm = &pblk->lm;
+       struct pblk_line *line, *tline;
+       LIST_HEAD(list);
+
+       spin_lock(&l_mg->close_lock);
+       if (list_empty(&l_mg->emeta_list)) {
+               spin_unlock(&l_mg->close_lock);
+               return;
+       }
+
+       list_cut_position(&list, &l_mg->emeta_list, l_mg->emeta_list.prev);
+       spin_unlock(&l_mg->close_lock);
+
+       list_for_each_entry_safe(line, tline, &list, list) {
+               struct pblk_emeta *emeta = line->emeta;
+
+               while (emeta->mem < lm->emeta_len[0]) {
+                       int ret;
+
+                       ret = pblk_submit_meta_io(pblk, line);
+                       if (ret) {
+                               pr_err("pblk: sync meta line %d failed (%d)\n",
+                                                       line->id, ret);
+                               return;
+                       }
+               }
+       }
+
+       pblk_wait_for_meta(pblk);
+}
+
+static void pblk_line_should_sync_meta(struct pblk *pblk)
+{
+       if (pblk_rl_is_limit(&pblk->rl))
+               pblk_line_close_meta_sync(pblk);
+}
+
 void pblk_line_close(struct pblk *pblk, struct pblk_line *line)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_emeta *emeta = line->emeta;
        struct line_emeta *emeta_buf = emeta->buf;
 
-       /* No need for exact vsc value; avoid a big line lock and tak aprox. */
+       /* No need for exact vsc value; avoid a big line lock and take aprox. */
        memcpy(emeta_to_vsc(pblk, emeta_buf), l_mg->vsc_list, lm->vsc_list_len);
        memcpy(emeta_to_bb(emeta_buf), line->blk_bitmap, lm->blk_bitmap_len);
 
        list_add_tail(&line->list, &l_mg->emeta_list);
        spin_unlock(&line->lock);
        spin_unlock(&l_mg->close_lock);
+
+       pblk_line_should_sync_meta(pblk);
 }
 
 void pblk_line_close_ws(struct work_struct *work)
 
        kfree(l_mg->bb_aux);
        kfree(l_mg->vsc_list);
 
+       spin_lock(&l_mg->free_lock);
        for (i = 0; i < PBLK_DATA_LINES; i++) {
                kfree(l_mg->sline_meta[i]);
                pblk_mfree(l_mg->eline_meta[i]->buf, l_mg->emeta_alloc_type);
                kfree(l_mg->eline_meta[i]);
        }
+       spin_unlock(&l_mg->free_lock);
 
        kfree(pblk->lines);
 }
 
 static void pblk_tear_down(struct pblk *pblk)
 {
-       pblk_flush_writer(pblk);
+       pblk_pipeline_stop(pblk);
        pblk_writer_stop(pblk);
        pblk_rb_sync_l2p(&pblk->rwb);
-       pblk_recov_pad(pblk);
        pblk_rwb_free(pblk);
        pblk_rl_free(&pblk->rl);
 
 
        pblk->dev = dev;
        pblk->disk = tdisk;
+       pblk->state = PBLK_STATE_RUNNING;
 
        spin_lock_init(&pblk->trans_lock);
        spin_lock_init(&pblk->lock);
 
 
        if (pblk_line_is_full(line)) {
                struct pblk_line *prev_line = line;
-               line = pblk_line_replace_data(pblk);
-               if (!line)
-                       return;
+
+               pblk_line_replace_data(pblk);
                pblk_line_close_meta(pblk, prev_line);
        }
 
                pblk_map_page_data(pblk, sentry + i, &rqd->ppa_list[i],
                                        lun_bitmap, &meta_list[i], map_secs);
 
-               /* line can change after page map */
-               e_line = pblk_line_get_erase(pblk);
                erase_lun = pblk_ppa_to_pos(geo, rqd->ppa_list[i]);
 
+               /* line can change after page map. We might also be writing the
+                * last line.
+                */
+               e_line = pblk_line_get_erase(pblk);
+               if (!e_line)
+                       return pblk_map_rq(pblk, rqd, sentry, lun_bitmap,
+                                                       valid_secs, i + min);
+
                spin_lock(&e_line->lock);
                if (!test_bit(erase_lun, e_line->erase_bitmap)) {
                        set_bit(erase_lun, e_line->erase_bitmap);
                spin_unlock(&e_line->lock);
        }
 
-       e_line = pblk_line_get_erase(pblk);
        d_line = pblk_line_get_data(pblk);
 
+       /* line can change after page map. We might also be writing the
+        * last line.
+        */
+       e_line = pblk_line_get_erase(pblk);
+       if (!e_line)
+               return;
+
        /* Erase blocks that are bad in this line but might not be in next */
        if (unlikely(ppa_empty(*erase_ppa)) &&
                        bitmap_weight(d_line->blk_bitmap, lm->blk_per_line)) {
 
        /* Protect syncs */
        smp_store_release(&rb->sync_point, sync_point);
 
+       if (!bio)
+               return 0;
+
        spin_lock_irq(&rb->s_lock);
        bio_list_add(&entry->w_ctx.bios, bio);
        spin_unlock_irq(&rb->s_lock);
        return 1;
 }
 
+void pblk_rb_flush(struct pblk_rb *rb)
+{
+       struct pblk *pblk = container_of(rb, struct pblk, rwb);
+       unsigned int mem = READ_ONCE(rb->mem);
+
+       if (pblk_rb_sync_point_set(rb, NULL, mem))
+               return;
+
+       pblk_write_should_kick(pblk);
+}
+
 static int pblk_rb_may_write_flush(struct pblk_rb *rb, unsigned int nr_entries,
                                   unsigned int *pos, struct bio *bio,
                                   int *io_ret)
                           unsigned int nr_entries, unsigned int *pos)
 {
        struct pblk *pblk = container_of(rb, struct pblk, rwb);
-       int flush_done;
+       int io_ret;
 
        spin_lock(&rb->w_lock);
-       if (!pblk_rl_user_may_insert(&pblk->rl, nr_entries)) {
+       io_ret = pblk_rl_user_may_insert(&pblk->rl, nr_entries);
+       if (io_ret) {
                spin_unlock(&rb->w_lock);
-               return NVM_IO_REQUEUE;
+               return io_ret;
        }
 
-       if (!pblk_rb_may_write_flush(rb, nr_entries, pos, bio, &flush_done)) {
+       if (!pblk_rb_may_write_flush(rb, nr_entries, pos, bio, &io_ret)) {
                spin_unlock(&rb->w_lock);
                return NVM_IO_REQUEUE;
        }
        pblk_rl_user_in(&pblk->rl, nr_entries);
        spin_unlock(&rb->w_lock);
 
-       return flush_done;
+       return io_ret;
 }
 
 /*
 
 #endif
 
        pblk_free_rqd(pblk, rqd, READ);
+       atomic_dec(&pblk->inflight_io);
 }
 
 static int pblk_fill_partial_read_bio(struct pblk *pblk, struct nvm_rq *rqd,
        bio_get(bio);
        if (bitmap_full(&read_bitmap, nr_secs)) {
                bio_endio(bio);
+               atomic_inc(&pblk->inflight_io);
                pblk_end_io_read(rqd);
                return NVM_IO_OK;
        }
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: GC read I/O timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
 
        if (rqd.error) {
                atomic_long_inc(&pblk->read_failed_gc);
 
                pr_err("pblk: L2P recovery read timed out\n");
                return -EINTR;
        }
-
+       atomic_dec(&pblk->inflight_io);
        reinit_completion(&wait);
 
        /* At this point, the read should not fail. If it does, it is a problem
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: L2P recovery write timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
        reinit_completion(&wait);
 
        left_line_ppas -= rq_ppas;
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: L2P recovery read timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
        reinit_completion(&wait);
 
        /* This should not happen since the read failed during normal recovery,
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: L2P recovery read timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
        reinit_completion(&wait);
 
        /* Reached the end of the written line */
 }
 
 /*
- * Pad until smeta can be read on current data line
+ * Pad current line
  */
-void pblk_recov_pad(struct pblk *pblk)
+int pblk_recov_pad(struct pblk *pblk)
 {
        struct nvm_tgt_dev *dev = pblk->dev;
        struct nvm_geo *geo = &dev->geo;
        struct ppa_addr *ppa_list;
        struct pblk_sec_meta *meta_list;
        void *data;
+       int left_msecs;
+       int ret = 0;
        dma_addr_t dma_ppa_list, dma_meta_list;
 
        spin_lock(&l_mg->free_lock);
        line = l_mg->data_line;
+       left_msecs = line->left_msecs;
        spin_unlock(&l_mg->free_lock);
 
        rqd = pblk_alloc_rqd(pblk, READ);
        if (IS_ERR(rqd))
-               return;
+               return PTR_ERR(rqd);
 
        meta_list = nvm_dev_dma_alloc(dev->parent, GFP_KERNEL, &dma_meta_list);
-       if (!meta_list)
+       if (!meta_list) {
+               ret = -ENOMEM;
                goto free_rqd;
+       }
 
        ppa_list = (void *)(meta_list) + pblk_dma_meta_size;
        dma_ppa_list = dma_meta_list + pblk_dma_meta_size;
 
        data = kcalloc(pblk->max_write_pgs, geo->sec_size, GFP_KERNEL);
-       if (!data)
+       if (!data) {
+               ret = -ENOMEM;
                goto free_meta_list;
+       }
 
        p.ppa_list = ppa_list;
        p.meta_list = meta_list;
        p.dma_ppa_list = dma_ppa_list;
        p.dma_meta_list = dma_meta_list;
 
-       if (pblk_recov_pad_oob(pblk, line, p, line->left_msecs)) {
-               pr_err("pblk: Tear down padding failed\n");
+       ret = pblk_recov_pad_oob(pblk, line, p, left_msecs);
+       if (ret) {
+               pr_err("pblk: Tear down padding failed (%d)\n", ret);
                goto free_data;
        }
 
-       pblk_line_close(pblk, line);
+       pblk_line_close_meta(pblk, line);
 
 free_data:
        kfree(data);
        nvm_dev_dma_free(dev->parent, meta_list, dma_meta_list);
 free_rqd:
        pblk_free_rqd(pblk, rqd, READ);
+
+       return ret;
 }
 
        mod_timer(&rl->u_timer, jiffies + msecs_to_jiffies(5000));
 }
 
+int pblk_rl_is_limit(struct pblk_rl *rl)
+{
+       int rb_space;
+
+       rb_space = atomic_read(&rl->rb_space);
+
+       return (rb_space == 0);
+}
+
 int pblk_rl_user_may_insert(struct pblk_rl *rl, int nr_entries)
 {
        int rb_user_cnt = atomic_read(&rl->rb_user_cnt);
+       int rb_space = atomic_read(&rl->rb_space);
+
+       if (unlikely(rb_space >= 0) && (rb_space - nr_entries < 0))
+               return NVM_IO_ERR;
+
+       if (rb_user_cnt >= rl->rb_user_max)
+               return NVM_IO_REQUEUE;
 
-       return (!(rb_user_cnt >= rl->rb_user_max));
+       return NVM_IO_OK;
+}
+
+void pblk_rl_inserted(struct pblk_rl *rl, int nr_entries)
+{
+       int rb_space = atomic_read(&rl->rb_space);
+
+       if (unlikely(rb_space >= 0))
+               atomic_sub(nr_entries, &rl->rb_space);
 }
 
 int pblk_rl_gc_may_insert(struct pblk_rl *rl, int nr_entries)
        /* To start with, all buffer is available to user I/O writers */
        rl->rb_budget = budget;
        rl->rb_user_max = budget;
-       atomic_set(&rl->rb_user_cnt, 0);
        rl->rb_gc_max = 0;
        rl->rb_state = PBLK_RL_HIGH;
+
+       atomic_set(&rl->rb_user_cnt, 0);
        atomic_set(&rl->rb_gc_cnt, 0);
+       atomic_set(&rl->rb_space, -1);
 
        setup_timer(&rl->u_timer, pblk_rl_u_timer, (unsigned long)rl);
 
 
                geo->nr_luns, lm->blk_per_line, lm->sec_per_line);
 
        sz += snprintf(page + sz, PAGE_SIZE - sz,
-               "lines:d:%d,l:%d-f:%d,m:%d,c:%d,b:%d,co:%d(d:%d,l:%d)t:%d\n",
+               "lines:d:%d,l:%d-f:%d,m:%d/%d,c:%d,b:%d,co:%d(d:%d,l:%d)t:%d\n",
                                        cur_data, cur_log,
-                                       nr_free_lines, emeta_line_cnt,
+                                       nr_free_lines,
+                                       emeta_line_cnt, meta_weight,
                                        closed_line_cnt,
                                        bad, cor,
                                        d_line_cnt, l_line_cnt,
        sz += snprintf(page + sz, PAGE_SIZE - sz,
                "data (%d) cur:%d, left:%d, vsc:%d, s:%d, map:%d/%d (%d)\n",
                        cur_data, cur_sec, msecs, vsc, sec_in_line,
-                       map_weight, lm->sec_per_line, meta_weight);
+                       map_weight, lm->sec_per_line,
+                       atomic_read(&pblk->inflight_io));
 
        return sz;
 }
 
 #endif
 
        pblk_complete_write(pblk, rqd, c_ctx);
+       atomic_dec(&pblk->inflight_io);
 }
 
 static void pblk_end_io_write_meta(struct nvm_rq *rqd)
 
        bio_put(rqd->bio);
        pblk_free_rqd(pblk, rqd, READ);
+
+       atomic_dec(&pblk->inflight_io);
 }
 
 static int pblk_alloc_w_rq(struct pblk *pblk, struct nvm_rq *rqd,
                return ret;
        }
 
-       if (likely(!atomic_read(&e_line->left_eblks) || !e_line))
+       if (likely(!e_line || !atomic_read(&e_line->left_eblks)))
                pblk_map_rq(pblk, rqd, c_ctx->sentry, lun_bitmap, valid, 0);
        else
                pblk_map_erase_rq(pblk, rqd, c_ctx->sentry, lun_bitmap,
 
                                 */
        int rb_budget;          /* Total number of entries available for I/O */
        int rb_user_max;        /* Max buffer entries available for user I/O */
-       atomic_t rb_user_cnt;   /* User I/O buffer counter */
        int rb_gc_max;          /* Max buffer entries available for GC I/O */
        int rb_gc_rsv;          /* Reserved buffer entries for GC I/O */
        int rb_state;           /* Rate-limiter current state */
+
+       atomic_t rb_user_cnt;   /* User I/O buffer counter */
        atomic_t rb_gc_cnt;     /* GC I/O buffer counter */
+       atomic_t rb_space;      /* Space limit in case of reaching capacity */
 
        int rsv_blocks;         /* Reserved blocks for GC */
 
        u8      sec_offset;
 };
 
+enum {
+       PBLK_STATE_RUNNING = 0,
+       PBLK_STATE_STOPPING = 1,
+       PBLK_STATE_RECOVERING = 2,
+       PBLK_STATE_STOPPED = 3,
+};
+
 struct pblk {
        struct nvm_tgt_dev *dev;
        struct gendisk *disk;
 
        struct pblk_rb rwb;
 
+       int state;                      /* pblk line state */
+
        int min_write_pgs; /* Minimum amount of pages required by controller */
        int max_write_pgs; /* Maximum amount of pages supported by controller */
        int pgs_in_buffer; /* Number of pages that need to be held in buffer to
        atomic_long_t write_failed;
        atomic_long_t erase_failed;
 
+       atomic_t inflight_io;           /* General inflight I/O counter */
+
        struct task_struct *writer_ts;
 
        /* Simple translation map of logical addresses to physical addresses.
                            struct pblk_w_ctx w_ctx, struct pblk_line *gc_line,
                            unsigned int pos);
 struct pblk_w_ctx *pblk_rb_w_ctx(struct pblk_rb *rb, unsigned int pos);
+void pblk_rb_flush(struct pblk_rb *rb);
 
 void pblk_rb_sync_l2p(struct pblk_rb *rb);
 unsigned int pblk_rb_read_to_bio(struct pblk_rb *rb, struct nvm_rq *rqd,
 int pblk_setup_w_rec_rq(struct pblk *pblk, struct nvm_rq *rqd,
                        struct pblk_c_ctx *c_ctx);
 void pblk_free_rqd(struct pblk *pblk, struct nvm_rq *rqd, int rw);
-void pblk_flush_writer(struct pblk *pblk);
+void pblk_wait_for_meta(struct pblk *pblk);
 struct ppa_addr pblk_get_lba_map(struct pblk *pblk, sector_t lba);
 void pblk_discard(struct pblk *pblk, struct bio *bio);
 void pblk_log_write_err(struct pblk *pblk, struct nvm_rq *rqd);
                              gfp_t gfp_mask);
 struct pblk_line *pblk_line_get(struct pblk *pblk);
 struct pblk_line *pblk_line_get_first_data(struct pblk *pblk);
-struct pblk_line *pblk_line_replace_data(struct pblk *pblk);
+void pblk_line_replace_data(struct pblk *pblk);
 int pblk_line_recov_alloc(struct pblk *pblk, struct pblk_line *line);
 void pblk_line_recov_close(struct pblk *pblk, struct pblk_line *line);
 struct pblk_line *pblk_line_get_data(struct pblk *pblk);
 void pblk_line_free(struct pblk *pblk, struct pblk_line *line);
 void pblk_line_close_meta(struct pblk *pblk, struct pblk_line *line);
 void pblk_line_close(struct pblk *pblk, struct pblk_line *line);
+void pblk_line_close_meta_sync(struct pblk *pblk);
 void pblk_line_close_ws(struct work_struct *work);
+void pblk_pipeline_stop(struct pblk *pblk);
 void pblk_line_mark_bb(struct work_struct *work);
 void pblk_line_run_ws(struct pblk *pblk, struct pblk_line *line, void *priv,
                      void (*work)(struct work_struct *),
  */
 void pblk_submit_rec(struct work_struct *work);
 struct pblk_line *pblk_recov_l2p(struct pblk *pblk);
-void pblk_recov_pad(struct pblk *pblk);
+int pblk_recov_pad(struct pblk *pblk);
 __le64 *pblk_recov_get_lba_list(struct pblk *pblk, struct line_emeta *emeta);
 int pblk_recov_setup_rq(struct pblk *pblk, struct pblk_c_ctx *c_ctx,
                        struct pblk_rec_ctx *recovery, u64 *comp_bits,
 int pblk_rl_low_thrs(struct pblk_rl *rl);
 unsigned long pblk_rl_nr_free_blks(struct pblk_rl *rl);
 int pblk_rl_user_may_insert(struct pblk_rl *rl, int nr_entries);
+void pblk_rl_inserted(struct pblk_rl *rl, int nr_entries);
 void pblk_rl_user_in(struct pblk_rl *rl, int nr_entries);
 int pblk_rl_gc_may_insert(struct pblk_rl *rl, int nr_entries);
 void pblk_rl_gc_in(struct pblk_rl *rl, int nr_entries);
 int pblk_rl_sysfs_rate_show(struct pblk_rl *rl);
 void pblk_rl_free_lines_inc(struct pblk_rl *rl, struct pblk_line *line);
 void pblk_rl_free_lines_dec(struct pblk_rl *rl, struct pblk_line *line);
+void pblk_rl_set_space_limit(struct pblk_rl *rl, int entries_left);
+int pblk_rl_is_limit(struct pblk_rl *rl);
 
 /*
  * pblk sysfs