]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - fs/io_uring.c
Merge tag 'f2fs-for-5.4' of git://git.kernel.org/pub/scm/linux/kernel/git/jaegeuk/f2fs
[linux.git] / fs / io_uring.c
index be24596e90d757f04ac0fa31e7a39b30689ace1a..0dadbdbead0fbfef8b0f2373756b3254832ada53 100644 (file)
@@ -75,7 +75,7 @@
 
 #include "internal.h"
 
-#define IORING_MAX_ENTRIES     4096
+#define IORING_MAX_ENTRIES     32768
 #define IORING_MAX_FIXED_FILES 1024
 
 struct io_uring {
@@ -167,7 +167,7 @@ struct async_list {
        struct list_head        list;
 
        struct file             *file;
-       off_t                   io_end;
+       off_t                   io_start;
        size_t                  io_len;
 };
 
@@ -203,7 +203,7 @@ struct io_ring_ctx {
        } ____cacheline_aligned_in_smp;
 
        /* IO offload */
-       struct workqueue_struct *sqo_wq;
+       struct workqueue_struct *sqo_wq[2];
        struct task_struct      *sqo_thread;    /* if using sq thread polling */
        struct mm_struct        *sqo_mm;
        wait_queue_head_t       sqo_wait;
@@ -264,6 +264,7 @@ struct io_ring_ctx {
 struct sqe_submit {
        const struct io_uring_sqe       *sqe;
        unsigned short                  index;
+       u32                             sequence;
        bool                            has_user;
        bool                            needs_lock;
        bool                            needs_fixed_file;
@@ -311,6 +312,7 @@ struct io_kiocb {
 #define REQ_F_LINK             64      /* linked sqes */
 #define REQ_F_LINK_DONE                128     /* linked sqes done */
 #define REQ_F_FAIL_LINK                256     /* fail rest of links */
+#define REQ_F_SHADOW_DRAIN     512     /* link-drain shadow req */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
@@ -342,6 +344,7 @@ struct io_submit_state {
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
 
@@ -440,6 +443,24 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
        }
 }
 
+static inline void io_queue_async_work(struct io_ring_ctx *ctx,
+                                      struct io_kiocb *req)
+{
+       int rw;
+
+       switch (req->submit.sqe->opcode) {
+       case IORING_OP_WRITEV:
+       case IORING_OP_WRITE_FIXED:
+               rw = !(req->rw.ki_flags & IOCB_DIRECT);
+               break;
+       default:
+               rw = 0;
+               break;
+       }
+
+       queue_work(ctx->sqo_wq[rw], &req->work);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
        struct io_kiocb *req;
@@ -447,8 +468,13 @@ static void io_commit_cqring(struct io_ring_ctx *ctx)
        __io_commit_cqring(ctx);
 
        while ((req = io_get_deferred_req(ctx)) != NULL) {
+               if (req->flags & REQ_F_SHADOW_DRAIN) {
+                       /* Just for drain, free it. */
+                       __io_free_req(req);
+                       continue;
+               }
                req->flags |= REQ_F_IO_DRAINED;
-               queue_work(ctx->sqo_wq, &req->work);
+               io_queue_async_work(ctx, req);
        }
 }
 
@@ -611,7 +637,7 @@ static void io_req_link_next(struct io_kiocb *req)
 
                nxt->flags |= REQ_F_LINK_DONE;
                INIT_WORK(&nxt->work, io_sq_wq_submit_work);
-               queue_work(req->ctx->sqo_wq, &nxt->work);
+               io_queue_async_work(req->ctx, nxt);
        }
 }
 
@@ -1163,6 +1189,28 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
        return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
 }
 
+static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
+{
+       if (al->file == kiocb->ki_filp) {
+               off_t start, end;
+
+               /*
+                * Allow merging if we're anywhere in the range of the same
+                * page. Generally this happens for sub-page reads or writes,
+                * and it's beneficial to allow the first worker to bring the
+                * page in and the piggy backed work can then work on the
+                * cached page.
+                */
+               start = al->io_start & PAGE_MASK;
+               end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
+               if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
+                       return true;
+       }
+
+       al->file = NULL;
+       return false;
+}
+
 /*
  * Make a note of the last file/offset/direction we punted to async
  * context. We'll use this information to see if we can piggy back a
@@ -1174,9 +1222,8 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
        struct async_list *async_list = &req->ctx->pending_async[rw];
        struct kiocb *kiocb = &req->rw;
        struct file *filp = kiocb->ki_filp;
-       off_t io_end = kiocb->ki_pos + len;
 
-       if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+       if (io_should_merge(async_list, kiocb)) {
                unsigned long max_bytes;
 
                /* Use 8x RA size as a decent limiter for both reads/writes */
@@ -1189,17 +1236,16 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
                        req->flags |= REQ_F_SEQ_PREV;
                        async_list->io_len += len;
                } else {
-                       io_end = 0;
-                       async_list->io_len = 0;
+                       async_list->file = NULL;
                }
        }
 
        /* New file? Reset state. */
        if (async_list->file != filp) {
-               async_list->io_len = 0;
+               async_list->io_start = kiocb->ki_pos;
+               async_list->io_len = len;
                async_list->file = filp;
        }
-       async_list->io_end = io_end;
 }
 
 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
@@ -1511,7 +1557,7 @@ static void io_poll_remove_one(struct io_kiocb *req)
        WRITE_ONCE(poll->canceled, true);
        if (!list_empty(&poll->wait.entry)) {
                list_del_init(&poll->wait.entry);
-               queue_work(req->ctx->sqo_wq, &req->work);
+               io_queue_async_work(req->ctx, req);
        }
        spin_unlock(&poll->head->lock);
 
@@ -1625,7 +1671,7 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
                io_cqring_ev_posted(ctx);
                io_put_req(req);
        } else {
-               queue_work(ctx->sqo_wq, &req->work);
+               io_queue_async_work(ctx, req);
        }
 
        return 1;
@@ -1968,7 +2014,7 @@ static void io_sq_wq_submit_work(struct work_struct *work)
  */
 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
 {
-       bool ret = false;
+       bool ret;
 
        if (!list)
                return false;
@@ -2014,10 +2060,14 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        flags = READ_ONCE(s->sqe->flags);
        fd = READ_ONCE(s->sqe->fd);
 
-       if (flags & IOSQE_IO_DRAIN) {
+       if (flags & IOSQE_IO_DRAIN)
                req->flags |= REQ_F_IO_DRAIN;
-               req->sequence = ctx->cached_sq_head - 1;
-       }
+       /*
+        * All io need record the previous position, if LINK vs DARIN,
+        * it can be used to mark the position of the first IO in the
+        * link list.
+        */
+       req->sequence = s->sequence;
 
        if (!io_op_needs_file(s->sqe))
                return 0;
@@ -2039,21 +2089,12 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
        return 0;
 }
 
-static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
-                       struct sqe_submit *s)
+static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                       struct sqe_submit *s, bool force_nonblock)
 {
        int ret;
 
-       ret = io_req_defer(ctx, req, s->sqe);
-       if (ret) {
-               if (ret != -EIOCBQUEUED) {
-                       io_free_req(req);
-                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
-               }
-               return 0;
-       }
-
-       ret = __io_submit_sqe(ctx, req, s, true);
+       ret = __io_submit_sqe(ctx, req, s, force_nonblock);
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
 
@@ -2070,7 +2111,7 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
                                if (list)
                                        atomic_inc(&list->cnt);
                                INIT_WORK(&req->work, io_sq_wq_submit_work);
-                               queue_work(ctx->sqo_wq, &req->work);
+                               io_queue_async_work(ctx, req);
                        }
 
                        /*
@@ -2095,10 +2136,70 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
        return ret;
 }
 
+static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                       struct sqe_submit *s, bool force_nonblock)
+{
+       int ret;
+
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+               }
+               return 0;
+       }
+
+       return __io_queue_sqe(ctx, req, s, force_nonblock);
+}
+
+static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
+                             struct sqe_submit *s, struct io_kiocb *shadow,
+                             bool force_nonblock)
+{
+       int ret;
+       int need_submit = false;
+
+       if (!shadow)
+               return io_queue_sqe(ctx, req, s, force_nonblock);
+
+       /*
+        * Mark the first IO in link list as DRAIN, let all the following
+        * IOs enter the defer list. all IO needs to be completed before link
+        * list.
+        */
+       req->flags |= REQ_F_IO_DRAIN;
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+                       return 0;
+               }
+       } else {
+               /*
+                * If ret == 0 means that all IOs in front of link io are
+                * running done. let's queue link head.
+                */
+               need_submit = true;
+       }
+
+       /* Insert shadow req to defer_list, blocking next IOs */
+       spin_lock_irq(&ctx->completion_lock);
+       list_add_tail(&shadow->list, &ctx->defer_list);
+       spin_unlock_irq(&ctx->completion_lock);
+
+       if (need_submit)
+               return __io_queue_sqe(ctx, req, s, force_nonblock);
+
+       return 0;
+}
+
 #define SQE_VALID_FLAGS        (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
 
 static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
-                         struct io_submit_state *state, struct io_kiocb **link)
+                         struct io_submit_state *state, struct io_kiocb **link,
+                         bool force_nonblock)
 {
        struct io_uring_sqe *sqe_copy;
        struct io_kiocb *req;
@@ -2151,7 +2252,7 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
                INIT_LIST_HEAD(&req->link_list);
                *link = req;
        } else {
-               io_queue_sqe(ctx, req, s);
+               io_queue_sqe(ctx, req, s, force_nonblock);
        }
 }
 
@@ -2224,6 +2325,7 @@ static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
        if (head < ctx->sq_entries) {
                s->index = head;
                s->sqe = &ctx->sq_sqes[head];
+               s->sequence = ctx->cached_sq_head;
                ctx->cached_sq_head++;
                return true;
        }
@@ -2239,6 +2341,7 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
+       struct io_kiocb *shadow_req = NULL;
        bool prev_was_link = false;
        int i, submitted = 0;
 
@@ -2253,11 +2356,21 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_sqe(ctx, link, &link->submit);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                               true);
                        link = NULL;
                }
                prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
 
+               if (link && (sqes[i].sqe->flags & IOSQE_IO_DRAIN)) {
+                       if (!shadow_req) {
+                               shadow_req = io_get_req(ctx, NULL);
+                               shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+                               refcount_dec(&shadow_req->refs);
+                       }
+                       shadow_req->sequence = sqes[i].sequence;
+               }
+
                if (unlikely(mm_fault)) {
                        io_cqring_add_event(ctx, sqes[i].sqe->user_data,
                                                -EFAULT);
@@ -2265,13 +2378,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
                        sqes[i].has_user = has_user;
                        sqes[i].needs_lock = true;
                        sqes[i].needs_fixed_file = true;
-                       io_submit_sqe(ctx, &sqes[i], statep, &link);
+                       io_submit_sqe(ctx, &sqes[i], statep, &link, true);
                        submitted++;
                }
        }
 
        if (link)
-               io_queue_sqe(ctx, link, &link->submit);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
        if (statep)
                io_submit_state_end(&state);
 
@@ -2403,10 +2516,12 @@ static int io_sq_thread(void *data)
        return 0;
 }
 
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+                         bool block_for_last)
 {
        struct io_submit_state state, *statep = NULL;
        struct io_kiocb *link = NULL;
+       struct io_kiocb *shadow_req = NULL;
        bool prev_was_link = false;
        int i, submit = 0;
 
@@ -2416,6 +2531,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
        }
 
        for (i = 0; i < to_submit; i++) {
+               bool force_nonblock = true;
                struct sqe_submit s;
 
                if (!io_get_sqring(ctx, &s))
@@ -2426,21 +2542,43 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
                 * that's the end of the chain. Submit the previous link.
                 */
                if (!prev_was_link && link) {
-                       io_queue_sqe(ctx, link, &link->submit);
+                       io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                               force_nonblock);
                        link = NULL;
                }
                prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
 
+               if (link && (s.sqe->flags & IOSQE_IO_DRAIN)) {
+                       if (!shadow_req) {
+                               shadow_req = io_get_req(ctx, NULL);
+                               shadow_req->flags |= (REQ_F_IO_DRAIN | REQ_F_SHADOW_DRAIN);
+                               refcount_dec(&shadow_req->refs);
+                       }
+                       shadow_req->sequence = s.sequence;
+               }
+
                s.has_user = true;
                s.needs_lock = false;
                s.needs_fixed_file = false;
                submit++;
-               io_submit_sqe(ctx, &s, statep, &link);
+
+               /*
+                * The caller will block for events after submit, submit the
+                * last IO non-blocking. This is either the only IO it's
+                * submitting, or it already submitted the previous ones. This
+                * improves performance by avoiding an async punt that we don't
+                * need to do.
+                */
+               if (block_for_last && submit == to_submit)
+                       force_nonblock = false;
+
+               io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
        }
        io_commit_sqring(ctx);
 
        if (link)
-               io_queue_sqe(ctx, link, &link->submit);
+               io_queue_link_head(ctx, link, &link->submit, shadow_req,
+                                       block_for_last);
        if (statep)
                io_submit_state_end(statep);
 
@@ -2528,11 +2666,15 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
 
 static void io_finish_async(struct io_ring_ctx *ctx)
 {
+       int i;
+
        io_sq_thread_stop(ctx);
 
-       if (ctx->sqo_wq) {
-               destroy_workqueue(ctx->sqo_wq);
-               ctx->sqo_wq = NULL;
+       for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
+               if (ctx->sqo_wq[i]) {
+                       destroy_workqueue(ctx->sqo_wq[i]);
+                       ctx->sqo_wq[i] = NULL;
+               }
        }
 }
 
@@ -2740,16 +2882,31 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
        }
 
        /* Do QD, or 2 * CPUS, whatever is smallest */
-       ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
+       ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
+                       WQ_UNBOUND | WQ_FREEZABLE,
                        min(ctx->sq_entries - 1, 2 * num_online_cpus()));
-       if (!ctx->sqo_wq) {
+       if (!ctx->sqo_wq[0]) {
+               ret = -ENOMEM;
+               goto err;
+       }
+
+       /*
+        * This is for buffered writes, where we want to limit the parallelism
+        * due to file locking in file systems. As "normal" buffered writes
+        * should parellelize on writeout quite nicely, limit us to having 2
+        * pending. This avoids massive contention on the inode when doing
+        * buffered async writes.
+        */
+       ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
+                                               WQ_UNBOUND | WQ_FREEZABLE, 2);
+       if (!ctx->sqo_wq[1]) {
                ret = -ENOMEM;
                goto err;
        }
 
        return 0;
 err:
-       io_sq_thread_stop(ctx);
+       io_finish_async(ctx);
        mmdrop(ctx->sqo_mm);
        ctx->sqo_mm = NULL;
        return ret;
@@ -3199,19 +3356,27 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
         * Just return the requested submit count, and wake the thread if
         * we were asked to.
         */
+       ret = 0;
        if (ctx->flags & IORING_SETUP_SQPOLL) {
                if (flags & IORING_ENTER_SQ_WAKEUP)
                        wake_up(&ctx->sqo_wait);
                submitted = to_submit;
-               goto out_ctx;
-       }
+       } else if (to_submit) {
+               bool block_for_last = false;
 
-       ret = 0;
-       if (to_submit) {
                to_submit = min(to_submit, ctx->sq_entries);
 
+               /*
+                * Allow last submission to block in a series, IFF the caller
+                * asked to wait for events and we don't currently have
+                * enough. This potentially avoids an async punt.
+                */
+               if (to_submit == min_complete &&
+                   io_cqring_events(ctx->rings) < min_complete)
+                       block_for_last = true;
+
                mutex_lock(&ctx->uring_lock);
-               submitted = io_ring_submit(ctx, to_submit);
+               submitted = io_ring_submit(ctx, to_submit, block_for_last);
                mutex_unlock(&ctx->uring_lock);
        }
        if (flags & IORING_ENTER_GETEVENTS) {
@@ -3226,7 +3391,6 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                }
        }
 
-out_ctx:
        io_ring_drop_ctx_refs(ctx, 1);
 out_fput:
        fdput(f);