]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - drivers/lightnvm/pblk-core.c
lightnvm: pblk: use vmalloc for GC data buffer
[linux.git] / drivers / lightnvm / pblk-core.c
index 6e4b06f841e71db1972b20bca36815ea889b2bc4..74b8d9db05e19767bf7c94c8ddb2794605c25ae4 100644 (file)
@@ -17,7 +17,6 @@
  */
 
 #include "pblk.h"
-#include <linux/time.h>
 
 static void pblk_mark_bb(struct pblk *pblk, struct pblk_line *line,
                         struct ppa_addr *ppa)
@@ -34,7 +33,7 @@ static void pblk_mark_bb(struct pblk *pblk, struct pblk_line *line,
                pr_err("pblk: attempted to erase bb: line:%d, pos:%d\n",
                                                        line->id, pos);
 
-       pblk_line_run_ws(pblk, NULL, ppa, pblk_line_mark_bb);
+       pblk_line_run_ws(pblk, NULL, ppa, pblk_line_mark_bb, pblk->bb_wq);
 }
 
 static void __pblk_end_io_erase(struct pblk *pblk, struct nvm_rq *rqd)
@@ -54,6 +53,8 @@ static void __pblk_end_io_erase(struct pblk *pblk, struct nvm_rq *rqd)
                *ppa = rqd->ppa_addr;
                pblk_mark_bb(pblk, line, ppa);
        }
+
+       atomic_dec(&pblk->inflight_io);
 }
 
 /* Erase completion assumes that only one block is erased at the time */
@@ -65,8 +66,8 @@ static void pblk_end_io_erase(struct nvm_rq *rqd)
        mempool_free(rqd, pblk->g_rq_pool);
 }
 
-static void __pblk_map_invalidate(struct pblk *pblk, struct pblk_line *line,
-                                 u64 paddr)
+void __pblk_map_invalidate(struct pblk *pblk, struct pblk_line *line,
+                          u64 paddr)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct list_head *move_list = NULL;
@@ -129,18 +130,6 @@ void pblk_map_invalidate(struct pblk *pblk, struct ppa_addr ppa)
        __pblk_map_invalidate(pblk, line, paddr);
 }
 
-void pblk_map_pad_invalidate(struct pblk *pblk, struct pblk_line *line,
-                            u64 paddr)
-{
-       __pblk_map_invalidate(pblk, line, paddr);
-
-       pblk_rb_sync_init(&pblk->rwb, NULL);
-       line->left_ssecs--;
-       if (!line->left_ssecs)
-               pblk_line_run_ws(pblk, line, NULL, pblk_line_close_ws);
-       pblk_rb_sync_end(&pblk->rwb, NULL);
-}
-
 static void pblk_invalidate_range(struct pblk *pblk, sector_t slba,
                                  unsigned int nr_secs)
 {
@@ -270,35 +259,25 @@ void pblk_end_io_sync(struct nvm_rq *rqd)
        complete(waiting);
 }
 
-void pblk_flush_writer(struct pblk *pblk)
+void pblk_wait_for_meta(struct pblk *pblk)
 {
-       struct bio *bio;
-       int ret;
-       DECLARE_COMPLETION_ONSTACK(wait);
-
-       bio = bio_alloc(GFP_KERNEL, 1);
-       if (!bio)
-               return;
-
-       bio->bi_iter.bi_sector = 0; /* internal bio */
-       bio_set_op_attrs(bio, REQ_OP_WRITE, REQ_OP_FLUSH);
-       bio->bi_private = &wait;
-       bio->bi_end_io = pblk_end_bio_sync;
+       do {
+               if (!atomic_read(&pblk->inflight_io))
+                       break;
 
-       ret = pblk_write_to_cache(pblk, bio, 0);
-       if (ret == NVM_IO_OK) {
-               if (!wait_for_completion_io_timeout(&wait,
-                               msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
-                       pr_err("pblk: flush cache timed out\n");
-               }
-       } else if (ret != NVM_IO_DONE) {
-               pr_err("pblk: tear down bio failed\n");
-       }
+               schedule();
+       } while (1);
+}
 
-       if (bio->bi_status)
-               pr_err("pblk: flush sync write failed (%u)\n", bio->bi_status);
+static void pblk_flush_writer(struct pblk *pblk)
+{
+       pblk_rb_flush(&pblk->rwb);
+       do {
+               if (!pblk_rb_read_count(&pblk->rwb))
+                       break;
 
-       bio_put(bio);
+               schedule();
+       } while (1);
 }
 
 struct list_head *pblk_line_gc_list(struct pblk *pblk, struct pblk_line *line)
@@ -308,17 +287,19 @@ struct list_head *pblk_line_gc_list(struct pblk *pblk, struct pblk_line *line)
        struct list_head *move_list = NULL;
        int vsc = le32_to_cpu(*line->vsc);
 
+       lockdep_assert_held(&line->lock);
+
        if (!vsc) {
                if (line->gc_group != PBLK_LINEGC_FULL) {
                        line->gc_group = PBLK_LINEGC_FULL;
                        move_list = &l_mg->gc_full_list;
                }
-       } else if (vsc < lm->mid_thrs) {
+       } else if (vsc < lm->high_thrs) {
                if (line->gc_group != PBLK_LINEGC_HIGH) {
                        line->gc_group = PBLK_LINEGC_HIGH;
                        move_list = &l_mg->gc_high_list;
                }
-       } else if (vsc < lm->high_thrs) {
+       } else if (vsc < lm->mid_thrs) {
                if (line->gc_group != PBLK_LINEGC_MID) {
                        line->gc_group = PBLK_LINEGC_MID;
                        move_list = &l_mg->gc_mid_list;
@@ -436,21 +417,23 @@ int pblk_submit_io(struct pblk *pblk, struct nvm_rq *rqd)
                }
        }
 #endif
+
+       atomic_inc(&pblk->inflight_io);
+
        return nvm_submit_io(dev, rqd);
 }
 
 struct bio *pblk_bio_map_addr(struct pblk *pblk, void *data,
                              unsigned int nr_secs, unsigned int len,
-                             gfp_t gfp_mask)
+                             int alloc_type, gfp_t gfp_mask)
 {
        struct nvm_tgt_dev *dev = pblk->dev;
-       struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        void *kaddr = data;
        struct page *page;
        struct bio *bio;
        int i, ret;
 
-       if (l_mg->emeta_alloc_type == PBLK_KMALLOC_META)
+       if (alloc_type == PBLK_KMALLOC_META)
                return bio_map_kern(dev->q, kaddr, len, gfp_mask);
 
        bio = bio_kmalloc(gfp_mask, nr_secs);
@@ -515,6 +498,8 @@ u64 __pblk_alloc_page(struct pblk *pblk, struct pblk_line *line, int nr_secs)
        u64 addr;
        int i;
 
+       lockdep_assert_held(&line->lock);
+
        /* logic error: ppa out-of-bounds. Prevent generating bad address */
        if (line->cur_sec + nr_secs > pblk->lm.sec_per_line) {
                WARN(1, "pblk: page allocation out of bounds\n");
@@ -566,17 +551,17 @@ static int pblk_line_submit_emeta_io(struct pblk *pblk, struct pblk_line *line,
 {
        struct nvm_tgt_dev *dev = pblk->dev;
        struct nvm_geo *geo = &dev->geo;
+       struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line_meta *lm = &pblk->lm;
+       void *ppa_list, *meta_list;
        struct bio *bio;
        struct nvm_rq rqd;
-       struct ppa_addr *ppa_list;
-       dma_addr_t dma_ppa_list;
+       dma_addr_t dma_ppa_list, dma_meta_list;
        int min = pblk->min_write_pgs;
        int left_ppas = lm->emeta_sec[0];
        int id = line->id;
        int rq_ppas, rq_len;
        int cmd_op, bio_op;
-       int flags;
        int i, j;
        int ret;
        DECLARE_COMPLETION_ONSTACK(wait);
@@ -584,25 +569,28 @@ static int pblk_line_submit_emeta_io(struct pblk *pblk, struct pblk_line *line,
        if (dir == WRITE) {
                bio_op = REQ_OP_WRITE;
                cmd_op = NVM_OP_PWRITE;
-               flags = pblk_set_progr_mode(pblk, WRITE);
        } else if (dir == READ) {
                bio_op = REQ_OP_READ;
                cmd_op = NVM_OP_PREAD;
-               flags = pblk_set_read_mode(pblk);
        } else
                return -EINVAL;
 
-       ppa_list = nvm_dev_dma_alloc(dev->parent, GFP_KERNEL, &dma_ppa_list);
-       if (!ppa_list)
+       meta_list = nvm_dev_dma_alloc(dev->parent, GFP_KERNEL,
+                                                       &dma_meta_list);
+       if (!meta_list)
                return -ENOMEM;
 
+       ppa_list = meta_list + pblk_dma_meta_size;
+       dma_ppa_list = dma_meta_list + pblk_dma_meta_size;
+
 next_rq:
        memset(&rqd, 0, sizeof(struct nvm_rq));
 
        rq_ppas = pblk_calc_secs(pblk, left_ppas, 0);
        rq_len = rq_ppas * geo->sec_size;
 
-       bio = pblk_bio_map_addr(pblk, emeta_buf, rq_ppas, rq_len, GFP_KERNEL);
+       bio = pblk_bio_map_addr(pblk, emeta_buf, rq_ppas, rq_len,
+                                       l_mg->emeta_alloc_type, GFP_KERNEL);
        if (IS_ERR(bio)) {
                ret = PTR_ERR(bio);
                goto free_rqd_dma;
@@ -612,27 +600,38 @@ static int pblk_line_submit_emeta_io(struct pblk *pblk, struct pblk_line *line,
        bio_set_op_attrs(bio, bio_op, 0);
 
        rqd.bio = bio;
-       rqd.opcode = cmd_op;
-       rqd.flags = flags;
-       rqd.nr_ppas = rq_ppas;
+       rqd.meta_list = meta_list;
        rqd.ppa_list = ppa_list;
+       rqd.dma_meta_list = dma_meta_list;
        rqd.dma_ppa_list = dma_ppa_list;
+       rqd.opcode = cmd_op;
+       rqd.nr_ppas = rq_ppas;
        rqd.end_io = pblk_end_io_sync;
        rqd.private = &wait;
 
        if (dir == WRITE) {
+               struct pblk_sec_meta *meta_list = rqd.meta_list;
+
+               rqd.flags = pblk_set_progr_mode(pblk, WRITE);
                for (i = 0; i < rqd.nr_ppas; ) {
                        spin_lock(&line->lock);
                        paddr = __pblk_alloc_page(pblk, line, min);
                        spin_unlock(&line->lock);
-                       for (j = 0; j < min; j++, i++, paddr++)
+                       for (j = 0; j < min; j++, i++, paddr++) {
+                               meta_list[i].lba = cpu_to_le64(ADDR_EMPTY);
                                rqd.ppa_list[i] =
                                        addr_to_gen_ppa(pblk, paddr, id);
+                       }
                }
        } else {
                for (i = 0; i < rqd.nr_ppas; ) {
                        struct ppa_addr ppa = addr_to_gen_ppa(pblk, paddr, id);
                        int pos = pblk_dev_ppa_to_pos(geo, ppa);
+                       int read_type = PBLK_READ_RANDOM;
+
+                       if (pblk_io_aligned(pblk, rq_ppas))
+                               read_type = PBLK_READ_SEQUENTIAL;
+                       rqd.flags = pblk_set_read_mode(pblk, read_type);
 
                        while (test_bit(pos, line->blk_bitmap)) {
                                paddr += min;
@@ -673,9 +672,11 @@ static int pblk_line_submit_emeta_io(struct pblk *pblk, struct pblk_line *line,
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: emeta I/O timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
        reinit_completion(&wait);
 
-       bio_put(bio);
+       if (likely(pblk->l_mg.emeta_alloc_type == PBLK_VMALLOC_META))
+               bio_put(bio);
 
        if (rqd.error) {
                if (dir == WRITE)
@@ -689,7 +690,7 @@ static int pblk_line_submit_emeta_io(struct pblk *pblk, struct pblk_line *line,
        if (left_ppas)
                goto next_rq;
 free_rqd_dma:
-       nvm_dev_dma_free(dev->parent, ppa_list, dma_ppa_list);
+       nvm_dev_dma_free(dev->parent, rqd.meta_list, rqd.dma_meta_list);
        return ret;
 }
 
@@ -729,17 +730,20 @@ static int pblk_line_submit_smeta_io(struct pblk *pblk, struct pblk_line *line,
        } else if (dir == READ) {
                bio_op = REQ_OP_READ;
                cmd_op = NVM_OP_PREAD;
-               flags = pblk_set_read_mode(pblk);
+               flags = pblk_set_read_mode(pblk, PBLK_READ_SEQUENTIAL);
        } else
                return -EINVAL;
 
        memset(&rqd, 0, sizeof(struct nvm_rq));
 
-       rqd.ppa_list = nvm_dev_dma_alloc(dev->parent, GFP_KERNEL,
-                                                       &rqd.dma_ppa_list);
-       if (!rqd.ppa_list)
+       rqd.meta_list = nvm_dev_dma_alloc(dev->parent, GFP_KERNEL,
+                                                       &rqd.dma_meta_list);
+       if (!rqd.meta_list)
                return -ENOMEM;
 
+       rqd.ppa_list = rqd.meta_list + pblk_dma_meta_size;
+       rqd.dma_ppa_list = rqd.dma_meta_list + pblk_dma_meta_size;
+
        bio = bio_map_kern(dev->q, line->smeta, lm->smeta_len, GFP_KERNEL);
        if (IS_ERR(bio)) {
                ret = PTR_ERR(bio);
@@ -757,9 +761,15 @@ static int pblk_line_submit_smeta_io(struct pblk *pblk, struct pblk_line *line,
        rqd.private = &wait;
 
        for (i = 0; i < lm->smeta_sec; i++, paddr++) {
+               struct pblk_sec_meta *meta_list = rqd.meta_list;
+
                rqd.ppa_list[i] = addr_to_gen_ppa(pblk, paddr, line->id);
-               if (dir == WRITE)
-                       lba_list[paddr] = cpu_to_le64(ADDR_EMPTY);
+
+               if (dir == WRITE) {
+                       __le64 addr_empty = cpu_to_le64(ADDR_EMPTY);
+
+                       meta_list[i].lba = lba_list[paddr] = addr_empty;
+               }
        }
 
        /*
@@ -778,6 +788,7 @@ static int pblk_line_submit_smeta_io(struct pblk *pblk, struct pblk_line *line,
                                msecs_to_jiffies(PBLK_COMMAND_TIMEOUT_MS))) {
                pr_err("pblk: smeta I/O timed out\n");
        }
+       atomic_dec(&pblk->inflight_io);
 
        if (rqd.error) {
                if (dir == WRITE)
@@ -787,7 +798,7 @@ static int pblk_line_submit_smeta_io(struct pblk *pblk, struct pblk_line *line,
        }
 
 free_ppa_list:
-       nvm_dev_dma_free(dev->parent, rqd.ppa_list, rqd.dma_ppa_list);
+       nvm_dev_dma_free(dev->parent, rqd.meta_list, rqd.dma_meta_list);
 
        return ret;
 }
@@ -819,7 +830,7 @@ static void pblk_setup_e_rq(struct pblk *pblk, struct nvm_rq *rqd,
 static int pblk_blk_erase_sync(struct pblk *pblk, struct ppa_addr ppa)
 {
        struct nvm_rq rqd;
-       int ret;
+       int ret = 0;
        DECLARE_COMPLETION_ONSTACK(wait);
 
        memset(&rqd, 0, sizeof(struct nvm_rq));
@@ -854,14 +865,14 @@ static int pblk_blk_erase_sync(struct pblk *pblk, struct ppa_addr ppa)
        rqd.private = pblk;
        __pblk_end_io_erase(pblk, &rqd);
 
-       return 0;
+       return ret;
 }
 
 int pblk_line_erase(struct pblk *pblk, struct pblk_line *line)
 {
        struct pblk_line_meta *lm = &pblk->lm;
        struct ppa_addr ppa;
-       int bit = -1;
+       int ret, bit = -1;
 
        /* Erase only good blocks, one at a time */
        do {
@@ -880,9 +891,10 @@ int pblk_line_erase(struct pblk *pblk, struct pblk_line *line)
                WARN_ON(test_and_set_bit(bit, line->erase_bitmap));
                spin_unlock(&line->lock);
 
-               if (pblk_blk_erase_sync(pblk, ppa)) {
+               ret = pblk_blk_erase_sync(pblk, ppa);
+               if (ret) {
                        pr_err("pblk: failed to erase line %d\n", line->id);
-                       return -ENOMEM;
+                       return ret;
                }
        } while (1);
 
@@ -895,6 +907,8 @@ static void pblk_line_setup_metadata(struct pblk_line *line,
 {
        int meta_line;
 
+       lockdep_assert_held(&l_mg->free_lock);
+
 retry_meta:
        meta_line = find_first_zero_bit(&l_mg->meta_bitmap, PBLK_DATA_LINES);
        if (meta_line == PBLK_DATA_LINES) {
@@ -1026,7 +1040,6 @@ static int pblk_line_init_bb(struct pblk *pblk, struct pblk_line *line,
        /* Mark smeta metadata sectors as bad sectors */
        bit = find_first_zero_bit(line->blk_bitmap, lm->blk_per_line);
        off = bit * geo->sec_per_pl;
-retry_smeta:
        bitmap_set(line->map_bitmap, off, lm->smeta_sec);
        line->sec_in_line -= lm->smeta_sec;
        line->smeta_ssec = off;
@@ -1034,8 +1047,7 @@ static int pblk_line_init_bb(struct pblk *pblk, struct pblk_line *line,
 
        if (init && pblk_line_submit_smeta_io(pblk, line, off, WRITE)) {
                pr_debug("pblk: line smeta I/O failed. Retry\n");
-               off += geo->sec_per_pl;
-               goto retry_smeta;
+               return 1;
        }
 
        bitmap_copy(line->invalid_bitmap, line->map_bitmap, lm->sec_per_line);
@@ -1057,7 +1069,7 @@ static int pblk_line_init_bb(struct pblk *pblk, struct pblk_line *line,
        line->sec_in_line -= lm->emeta_sec[0];
        line->emeta_ssec = off;
        line->nr_valid_lbas = 0;
-       line->left_ssecs = line->left_msecs = line->sec_in_line;
+       line->left_msecs = line->sec_in_line;
        *line->vsc = cpu_to_le32(line->sec_in_line);
 
        if (lm->sec_per_line - line->sec_in_line !=
@@ -1097,10 +1109,14 @@ static int pblk_line_prepare(struct pblk *pblk, struct pblk_line *line)
 
        spin_lock(&line->lock);
        if (line->state != PBLK_LINESTATE_FREE) {
+               mempool_free(line->invalid_bitmap, pblk->line_meta_pool);
+               mempool_free(line->map_bitmap, pblk->line_meta_pool);
                spin_unlock(&line->lock);
-               WARN(1, "pblk: corrupted line state\n");
-               return -EINTR;
+               WARN(1, "pblk: corrupted line %d, state %d\n",
+                                                       line->id, line->state);
+               return -EAGAIN;
        }
+
        line->state = PBLK_LINESTATE_OPEN;
 
        atomic_set(&line->left_eblks, blk_in_line);
@@ -1156,15 +1172,15 @@ struct pblk_line *pblk_line_get(struct pblk *pblk)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line_meta *lm = &pblk->lm;
-       struct pblk_line *line = NULL;
-       int bit;
+       struct pblk_line *line;
+       int ret, bit;
 
        lockdep_assert_held(&l_mg->free_lock);
 
-retry_get:
+retry:
        if (list_empty(&l_mg->free_list)) {
                pr_err("pblk: no free lines\n");
-               goto out;
+               return NULL;
        }
 
        line = list_first_entry(&l_mg->free_list, struct pblk_line, list);
@@ -1180,16 +1196,22 @@ struct pblk_line *pblk_line_get(struct pblk *pblk)
                list_add_tail(&line->list, &l_mg->bad_list);
 
                pr_debug("pblk: line %d is bad\n", line->id);
-               goto retry_get;
+               goto retry;
        }
 
-       if (pblk_line_prepare(pblk, line)) {
-               pr_err("pblk: failed to prepare line %d\n", line->id);
-               list_add(&line->list, &l_mg->free_list);
-               return NULL;
+       ret = pblk_line_prepare(pblk, line);
+       if (ret) {
+               if (ret == -EAGAIN) {
+                       list_add(&line->list, &l_mg->corrupt_list);
+                       goto retry;
+               } else {
+                       pr_err("pblk: failed to prepare line %d\n", line->id);
+                       list_add(&line->list, &l_mg->free_list);
+                       l_mg->nr_free_lines++;
+                       return NULL;
+               }
        }
 
-out:
        return line;
 }
 
@@ -1199,6 +1221,7 @@ static struct pblk_line *pblk_line_retry(struct pblk *pblk,
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line *retry_line;
 
+retry:
        spin_lock(&l_mg->free_lock);
        retry_line = pblk_line_get(pblk);
        if (!retry_line) {
@@ -1215,18 +1238,21 @@ static struct pblk_line *pblk_line_retry(struct pblk *pblk,
        l_mg->data_line = retry_line;
        spin_unlock(&l_mg->free_lock);
 
-       if (pblk_line_erase(pblk, retry_line)) {
-               spin_lock(&l_mg->free_lock);
-               l_mg->data_line = NULL;
-               spin_unlock(&l_mg->free_lock);
-               return NULL;
-       }
-
        pblk_rl_free_lines_dec(&pblk->rl, retry_line);
 
+       if (pblk_line_erase(pblk, retry_line))
+               goto retry;
+
        return retry_line;
 }
 
+static void pblk_set_space_limit(struct pblk *pblk)
+{
+       struct pblk_rl *rl = &pblk->rl;
+
+       atomic_set(&rl->rb_space, 0);
+}
+
 struct pblk_line *pblk_line_get_first_data(struct pblk *pblk)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
@@ -1248,20 +1274,31 @@ struct pblk_line *pblk_line_get_first_data(struct pblk *pblk)
 
        /* Allocate next line for preparation */
        l_mg->data_next = pblk_line_get(pblk);
-       if (l_mg->data_next) {
+       if (!l_mg->data_next) {
+               /* If we cannot get a new line, we need to stop the pipeline.
+                * Only allow as many writes in as we can store safely and then
+                * fail gracefully
+                */
+               pblk_set_space_limit(pblk);
+
+               l_mg->data_next = NULL;
+       } else {
                l_mg->data_next->seq_nr = l_mg->d_seq_nr++;
                l_mg->data_next->type = PBLK_LINETYPE_DATA;
                is_next = 1;
        }
        spin_unlock(&l_mg->free_lock);
 
+       if (pblk_line_erase(pblk, line)) {
+               line = pblk_line_retry(pblk, line);
+               if (!line)
+                       return NULL;
+       }
+
        pblk_rl_free_lines_dec(&pblk->rl, line);
        if (is_next)
                pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
 
-       if (pblk_line_erase(pblk, line))
-               return NULL;
-
 retry_setup:
        if (!pblk_line_init_metadata(pblk, line, NULL)) {
                line = pblk_line_retry(pblk, line);
@@ -1282,7 +1319,47 @@ struct pblk_line *pblk_line_get_first_data(struct pblk *pblk)
        return line;
 }
 
-struct pblk_line *pblk_line_replace_data(struct pblk *pblk)
+static void pblk_stop_writes(struct pblk *pblk, struct pblk_line *line)
+{
+       lockdep_assert_held(&pblk->l_mg.free_lock);
+
+       pblk_set_space_limit(pblk);
+       pblk->state = PBLK_STATE_STOPPING;
+}
+
+void pblk_pipeline_stop(struct pblk *pblk)
+{
+       struct pblk_line_mgmt *l_mg = &pblk->l_mg;
+       int ret;
+
+       spin_lock(&l_mg->free_lock);
+       if (pblk->state == PBLK_STATE_RECOVERING ||
+                                       pblk->state == PBLK_STATE_STOPPED) {
+               spin_unlock(&l_mg->free_lock);
+               return;
+       }
+       pblk->state = PBLK_STATE_RECOVERING;
+       spin_unlock(&l_mg->free_lock);
+
+       pblk_flush_writer(pblk);
+       pblk_wait_for_meta(pblk);
+
+       ret = pblk_recov_pad(pblk);
+       if (ret) {
+               pr_err("pblk: could not close data on teardown(%d)\n", ret);
+               return;
+       }
+
+       pblk_line_close_meta_sync(pblk);
+
+       spin_lock(&l_mg->free_lock);
+       pblk->state = PBLK_STATE_STOPPED;
+       l_mg->data_line = NULL;
+       l_mg->data_next = NULL;
+       spin_unlock(&l_mg->free_lock);
+}
+
+void pblk_line_replace_data(struct pblk *pblk)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
        struct pblk_line *cur, *new;
@@ -1292,42 +1369,38 @@ struct pblk_line *pblk_line_replace_data(struct pblk *pblk)
        cur = l_mg->data_line;
        new = l_mg->data_next;
        if (!new)
-               return NULL;
+               return;
        l_mg->data_line = new;
 
-retry_line:
+       spin_lock(&l_mg->free_lock);
+       if (pblk->state != PBLK_STATE_RUNNING) {
+               l_mg->data_line = NULL;
+               l_mg->data_next = NULL;
+               spin_unlock(&l_mg->free_lock);
+               return;
+       }
+
+       pblk_line_setup_metadata(new, l_mg, &pblk->lm);
+       spin_unlock(&l_mg->free_lock);
+
+retry_erase:
        left_seblks = atomic_read(&new->left_seblks);
        if (left_seblks) {
                /* If line is not fully erased, erase it */
                if (atomic_read(&new->left_eblks)) {
                        if (pblk_line_erase(pblk, new))
-                               return NULL;
+                               return;
                } else {
                        io_schedule();
                }
-               goto retry_line;
-       }
-
-       spin_lock(&l_mg->free_lock);
-       /* Allocate next line for preparation */
-       l_mg->data_next = pblk_line_get(pblk);
-       if (l_mg->data_next) {
-               l_mg->data_next->seq_nr = l_mg->d_seq_nr++;
-               l_mg->data_next->type = PBLK_LINETYPE_DATA;
-               is_next = 1;
+               goto retry_erase;
        }
 
-       pblk_line_setup_metadata(new, l_mg, &pblk->lm);
-       spin_unlock(&l_mg->free_lock);
-
-       if (is_next)
-               pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
-
 retry_setup:
        if (!pblk_line_init_metadata(pblk, new, cur)) {
                new = pblk_line_retry(pblk, new);
                if (!new)
-                       return NULL;
+                       return;
 
                goto retry_setup;
        }
@@ -1335,12 +1408,30 @@ struct pblk_line *pblk_line_replace_data(struct pblk *pblk)
        if (!pblk_line_init_bb(pblk, new, 1)) {
                new = pblk_line_retry(pblk, new);
                if (!new)
-                       return NULL;
+                       return;
 
                goto retry_setup;
        }
 
-       return new;
+       /* Allocate next line for preparation */
+       spin_lock(&l_mg->free_lock);
+       l_mg->data_next = pblk_line_get(pblk);
+       if (!l_mg->data_next) {
+               /* If we cannot get a new line, we need to stop the pipeline.
+                * Only allow as many writes in as we can store safely and then
+                * fail gracefully
+                */
+               pblk_stop_writes(pblk, new);
+               l_mg->data_next = NULL;
+       } else {
+               l_mg->data_next->seq_nr = l_mg->d_seq_nr++;
+               l_mg->data_next->type = PBLK_LINETYPE_DATA;
+               is_next = 1;
+       }
+       spin_unlock(&l_mg->free_lock);
+
+       if (is_next)
+               pblk_rl_free_lines_dec(&pblk->rl, l_mg->data_next);
 }
 
 void pblk_line_free(struct pblk *pblk, struct pblk_line *line)
@@ -1424,6 +1515,46 @@ int pblk_line_is_full(struct pblk_line *line)
        return (line->left_msecs == 0);
 }
 
+void pblk_line_close_meta_sync(struct pblk *pblk)
+{
+       struct pblk_line_mgmt *l_mg = &pblk->l_mg;
+       struct pblk_line_meta *lm = &pblk->lm;
+       struct pblk_line *line, *tline;
+       LIST_HEAD(list);
+
+       spin_lock(&l_mg->close_lock);
+       if (list_empty(&l_mg->emeta_list)) {
+               spin_unlock(&l_mg->close_lock);
+               return;
+       }
+
+       list_cut_position(&list, &l_mg->emeta_list, l_mg->emeta_list.prev);
+       spin_unlock(&l_mg->close_lock);
+
+       list_for_each_entry_safe(line, tline, &list, list) {
+               struct pblk_emeta *emeta = line->emeta;
+
+               while (emeta->mem < lm->emeta_len[0]) {
+                       int ret;
+
+                       ret = pblk_submit_meta_io(pblk, line);
+                       if (ret) {
+                               pr_err("pblk: sync meta line %d failed (%d)\n",
+                                                       line->id, ret);
+                               return;
+                       }
+               }
+       }
+
+       pblk_wait_for_meta(pblk);
+}
+
+static void pblk_line_should_sync_meta(struct pblk *pblk)
+{
+       if (pblk_rl_is_limit(&pblk->rl))
+               pblk_line_close_meta_sync(pblk);
+}
+
 void pblk_line_close(struct pblk *pblk, struct pblk_line *line)
 {
        struct pblk_line_mgmt *l_mg = &pblk->l_mg;
@@ -1452,6 +1583,8 @@ void pblk_line_close(struct pblk *pblk, struct pblk_line *line)
 
        spin_unlock(&line->lock);
        spin_unlock(&l_mg->gc_lock);
+
+       pblk_gc_should_kick(pblk);
 }
 
 void pblk_line_close_meta(struct pblk *pblk, struct pblk_line *line)
@@ -1461,7 +1594,7 @@ void pblk_line_close_meta(struct pblk *pblk, struct pblk_line *line)
        struct pblk_emeta *emeta = line->emeta;
        struct line_emeta *emeta_buf = emeta->buf;
 
-       /* No need for exact vsc value; avoid a big line lock and tak aprox. */
+       /* No need for exact vsc value; avoid a big line lock and take aprox. */
        memcpy(emeta_to_vsc(pblk, emeta_buf), l_mg->vsc_list, lm->vsc_list_len);
        memcpy(emeta_to_bb(emeta_buf), line->blk_bitmap, lm->blk_bitmap_len);
 
@@ -1473,6 +1606,8 @@ void pblk_line_close_meta(struct pblk *pblk, struct pblk_line *line)
        list_add_tail(&line->list, &l_mg->emeta_list);
        spin_unlock(&line->lock);
        spin_unlock(&l_mg->close_lock);
+
+       pblk_line_should_sync_meta(pblk);
 }
 
 void pblk_line_close_ws(struct work_struct *work)
@@ -1512,7 +1647,8 @@ void pblk_line_mark_bb(struct work_struct *work)
 }
 
 void pblk_line_run_ws(struct pblk *pblk, struct pblk_line *line, void *priv,
-                     void (*work)(struct work_struct *))
+                     void (*work)(struct work_struct *),
+                     struct workqueue_struct *wq)
 {
        struct pblk_line_ws *line_ws;
 
@@ -1525,7 +1661,7 @@ void pblk_line_run_ws(struct pblk *pblk, struct pblk_line *line, void *priv,
        line_ws->priv = priv;
 
        INIT_WORK(&line_ws->ws, work);
-       queue_work(pblk->kw_wq, &line_ws->ws);
+       queue_work(wq, &line_ws->ws);
 }
 
 void pblk_down_rq(struct pblk *pblk, struct ppa_addr *ppa_list, int nr_ppas,