]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - fs/io_uring.c
Merge tag 'f2fs-for-5.4' of git://git.kernel.org/pub/scm/linux/kernel/git/jaegeuk/f2fs
[linux.git] / fs / io_uring.c
index eff29d705a2613a6f3675ed01465cf8d12d7e2cf..0dadbdbead0fbfef8b0f2373756b3254832ada53 100644 (file)
@@ -75,7 +75,7 @@
 
 #include "internal.h"
 
-#define IORING_MAX_ENTRIES     4096
+#define IORING_MAX_ENTRIES     32768
 #define IORING_MAX_FIXED_FILES 1024
 
 struct io_uring {
@@ -167,7 +167,7 @@ struct async_list {
        struct list_head        list;
 
        struct file             *file;
-       off_t                   io_end;
+       off_t                   io_start;
        size_t                  io_len;
 };
 
@@ -203,7 +203,7 @@ struct io_ring_ctx {
        } ____cacheline_aligned_in_smp;
 
        /* IO offload */
-       struct workqueue_struct *sqo_wq;
+       struct workqueue_struct *sqo_wq[2];
        struct task_struct      *sqo_thread;    /* if using sq thread polling */
        struct mm_struct        *sqo_mm;
        wait_queue_head_t       sqo_wait;
@@ -443,6 +443,24 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
        }
 }
 
+static inline void io_queue_async_work(struct io_ring_ctx *ctx,
+                                      struct io_kiocb *req)
+{
+       int rw;
+
+       switch (req->submit.sqe->opcode) {
+       case IORING_OP_WRITEV:
+       case IORING_OP_WRITE_FIXED:
+               rw = !(req->rw.ki_flags & IOCB_DIRECT);
+               break;
+       default:
+               rw = 0;
+               break;
+       }
+
+       queue_work(ctx->sqo_wq[rw], &req->work);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
        struct io_kiocb *req;
@@ -456,7 +474,7 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
                        continue;
                }
                req->flags |= REQ_F_IO_DRAINED;
-               queue_work(ctx->sqo_wq, &req->work);
+               io_queue_async_work(ctx, req);
        }
 }
 
@@ -619,7 +637,7 @@ static void io_req_link_next(struct io_kiocb *req)
 
                nxt->flags |= REQ_F_LINK_DONE;
                INIT_WORK(&nxt->work, io_sq_wq_submit_work);
-               queue_work(req->ctx->sqo_wq, &nxt->work);
+               io_queue_async_work(req->ctx, nxt);
        }
 }
 
@@ -1171,6 +1189,28 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 }
 
+static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
+{
+       if (al->file == kiocb->ki_filp) {
+               off_t start, end;
+
+               /*
+                * Allow merging if we're anywhere in the range of the same
+                * page. Generally this happens for sub-page reads or writes,
+                * and it's beneficial to allow the first worker to bring the
+                * page in and the piggy backed work can then work on the
+                * cached page.
+                */
+               start = al->io_start & PAGE_MASK;
+               end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
+               if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
+                       return true;
+       }
+
+       al->file = NULL;
+       return false;
+}
+
 /*
  * Make a note of the last file/offset/direction we punted to async
  * context. We'll use this information to see if we can piggy back a
@@ -1182,9 +1222,8 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
        struct async_list *async_list = &req->ctx->pending_async[rw];
        struct kiocb *kiocb = &req->rw;
        struct file *filp = kiocb->ki_filp;
-       off_t io_end = kiocb->ki_pos + len;
 
-       if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+       if (io_should_merge(async_list, kiocb)) {
                unsigned long max_bytes;
 
                /* Use 8x RA size as a decent limiter for both reads/writes */
@@ -1197,17 +1236,16 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
                        req->flags |= REQ_F_SEQ_PREV;
                        async_list->io_len += len;
                } else {
-                       io_end = 0;
-                       async_list->io_len = 0;
+                       async_list->file = NULL;
                }
        }
 
        /* New file? Reset state. */
        if (async_list->file != filp) {
-               async_list->io_len = 0;
+               async_list->io_start = kiocb->ki_pos;
+               async_list->io_len = len;
                async_list->file = filp;
        }
-       async_list->io_end = io_end;
 }
 
 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
@@ -1519,7 +1557,7 @@ static void io_poll_remove_one(struct io_kiocb *req)
        WRITE_ONCE(poll->canceled, true);
        if (!list_empty(&poll->wait.entry)) {
                list_del_init(&poll->wait.entry);
-               queue_work(req->ctx->sqo_wq, &req->work);
+               io_queue_async_work(req->ctx, req);
        }
        spin_unlock(&poll->head->lock);
 
@@ -1633,7 +1671,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
                io_cqring_ev_posted(ctx);
                io_put_req(req);
        } else {
-               queue_work(ctx->sqo_wq, &req->work);
+               io_queue_async_work(ctx, req);
        }
 
        return 1;
@@ -1976,7 +2014,7 @@ static void io_sq_wq_submit_work(struct work_struct *work)
  */
 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
 {
-       bool ret = false;
+       bool ret;
 
        if (!list)
                return false;
@@ -2052,11 +2090,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
 }
 
 static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
-                       struct sqe_submit *s)
+                       struct sqe_submit *s, bool force_nonblock)
 {
        int ret;
 
-       ret = __io_submit_sqe(ctx, req, s, true);
+       ret = __io_submit_sqe(ctx, req, s, force_nonblock);
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
 
@@ -2073,7 +2111,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                                if (list)
                                        atomic_inc(&list->cnt);
                                INIT_WORK(&req->work, io_sq_wq_submit_work);
-                               queue_work(ctx->sqo_wq, &req->work);
+                               io_queue_async_work(ctx, req);
                        }
 
                        /*
@@ -2099,7 +2137,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 }
 
 static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
-                       struct sqe_submit *s)
+                       struct sqe_submit *s, bool force_nonblock)
 {
        int ret;
 
@@ -2112,17 +2150,18 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                return 0;
        }
 
-       return __io_queue_sqe(ctx, req, s);
+       return __io_queue_sqe(ctx, req, s, force_nonblock);
 }
 
 static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
-                             struct sqe_submit *s, struct io_kiocb *shadow)
+                             struct sqe_submit *s, struct io_kiocb *shadow,
+                             bool force_nonblock)
 {
        int ret;
        int need_submit = false;
 
        if (!shadow)
-               return io_queue_sqe(ctx, req, s);
+               return io_queue_sqe(ctx, req, s, force_nonblock);
 
        /*
         * Mark the first IO in link list as DRAIN, let all the following
@@ -2151,7 +2190,7 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
        spin_unlock_irq(&ctx->completion_lock);
 
        if (need_submit)
-               return __io_queue_sqe(ctx, req, s);
+               return __io_queue_sqe(ctx, req, s, force_nonblock);
 
        return 0;
 }
@@ -2159,7 +2198,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
 #define SQE_VALID_FLAGS        (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 
 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
-                         struct io_submit_state *state, struct io_kiocb **link)
+                         struct io_submit_state *state, struct io_kiocb **link,
+                         bool force_nonblock)
 {
        struct io_uring_sqe *sqe_copy;
        struct io_kiocb *req;
@@ -2212,7 +2252,7 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
                INIT_LIST_HEAD(&req->link_list);
                *link = req;
        } else {
-               io_queue_sqe(ctx, req, s);
+               io_queue_sqe(ctx, req, s, force_nonblock);
        }
 }
 
@@ -2316,7 +2356,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_link_head(ctx, link, &link->submit, shadow_req);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                               true);
                        link = NULL;
                }
                prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
@@ -2337,13 +2378,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                        sqes[i].has_user = has_user;
                        sqes[i].needs_lock = true;
                        sqes[i].needs_fixed_file = true;
-                       io_submit_sqe(ctx, &sqes[i], statep, &link);
+                       io_submit_sqe(ctx, &sqes[i], statep, &link, true);
                        submitted++;
                }
        }
 
        if (link)
-               io_queue_link_head(ctx, link, &link->submit, shadow_req);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
        if (statep)
                io_submit_state_end(&state);
 
@@ -2475,7 +2516,8 @@ static int io_sq_thread(void *data)
        return 0;
 }
 
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+                         bool block_for_last)
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
@@ -2489,6 +2531,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
        }
 
        for (i = 0; i < to_submit; i++) {
+               bool force_nonblock = true;
                struct sqe_submit s;
 
                if (!io_get_sqring(ctx, &s))
@@ -2499,7 +2542,8 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_link_head(ctx, link, &link->submit, shadow_req);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                               force_nonblock);
                        link = NULL;
                }
                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
@@ -2517,12 +2561,24 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                s.needs_lock = false;
                s.needs_fixed_file = false;
                submit++;
-               io_submit_sqe(ctx, &s, statep, &link);
+
+               /*
+                * The caller will block for events after submit, submit the
+                * last IO non-blocking. This is either the only IO it's
+                * submitting, or it already submitted the previous ones. This
+                * improves performance by avoiding an async punt that we don't
+                * need to do.
+                */
+               if (block_for_last && submit == to_submit)
+                       force_nonblock = false;
+
+               io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
        }
        io_commit_sqring(ctx);
 
        if (link)
-               io_queue_link_head(ctx, link, &link->submit, shadow_req);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                       block_for_last);
        if (statep)
                io_submit_state_end(statep);
 
@@ -2610,11 +2666,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
+       int i;
+
        io_sq_thread_stop(ctx);
 
-       if (ctx->sqo_wq) {
-               destroy_workqueue(ctx->sqo_wq);
-               ctx->sqo_wq = NULL;
+       for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
+               if (ctx->sqo_wq[i]) {
+                       destroy_workqueue(ctx->sqo_wq[i]);
+                       ctx->sqo_wq[i] = NULL;
+               }
        }
 }
 
@@ -2822,16 +2882,31 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
        }
 
        /* Do QD, or 2 * CPUS, whatever is smallest */
-       ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
+       ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
+                       WQ_UNBOUND | WQ_FREEZABLE,
                        min(ctx->sq_entries - 1, 2 * num_online_cpus()));
-       if (!ctx->sqo_wq) {
+       if (!ctx->sqo_wq[0]) {
+               ret = -ENOMEM;
+               goto err;
+       }
+
+       /*
+        * This is for buffered writes, where we want to limit the parallelism
+        * due to file locking in file systems. As "normal" buffered writes
+        * should parellelize on writeout quite nicely, limit us to having 2
+        * pending. This avoids massive contention on the inode when doing
+        * buffered async writes.
+        */
+       ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
+                                               WQ_UNBOUND | WQ_FREEZABLE, 2);
+       if (!ctx->sqo_wq[1]) {
                ret = -ENOMEM;
                goto err;
        }
 
        return 0;
 err:
-       io_sq_thread_stop(ctx);
+       io_finish_async(ctx);
        mmdrop(ctx->sqo_mm);
        ctx->sqo_mm = NULL;
        return ret;
@@ -3281,19 +3356,27 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
         * Just return the requested submit count, and wake the thread if
         * we were asked to.
         */
+       ret = 0;
        if (ctx->flags & IORING_SETUP_SQPOLL) {
                if (flags & IORING_ENTER_SQ_WAKEUP)
                        wake_up(&ctx->sqo_wait);
                submitted = to_submit;
-               goto out_ctx;
-       }
+       } else if (to_submit) {
+               bool block_for_last = false;
 
-       ret = 0;
-       if (to_submit) {
                to_submit = min(to_submit, ctx->sq_entries);
 
+               /*
+                * Allow last submission to block in a series, IFF the caller
+                * asked to wait for events and we don't currently have
+                * enough. This potentially avoids an async punt.
+                */
+               if (to_submit == min_complete &&
+                   io_cqring_events(ctx->rings) < min_complete)
+                       block_for_last = true;
+
                mutex_lock(&ctx->uring_lock);
-               submitted = io_ring_submit(ctx, to_submit);
+               submitted = io_ring_submit(ctx, to_submit, block_for_last);
                mutex_unlock(&ctx->uring_lock);
        }
        if (flags & IORING_ENTER_GETEVENTS) {
@@ -3308,7 +3391,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                }
        }
 
-out_ctx:
        io_ring_drop_ctx_refs(ctx, 1);
 out_fput:
        fdput(f);