]> asedeno.scripts.mit.edu Git - linux.git/blobdiff - fs/io_uring.c
Merge branch 'pm-cpufreq'
[linux.git] / fs / io_uring.c
index e2a66e12fbc634f1eb913ef80f504dbd470edd69..cfb48bd088e12a0bbd8847e2b78e7ba14cb26fd8 100644 (file)
@@ -202,7 +202,7 @@ struct async_list {
 
        struct file             *file;
        off_t                   io_end;
-       size_t                  io_pages;
+       size_t                  io_len;
 };
 
 struct io_ring_ctx {
@@ -333,7 +333,8 @@ struct io_kiocb {
 #define REQ_F_IO_DRAIN         16      /* drain existing IO first */
 #define REQ_F_IO_DRAINED       32      /* drain done */
 #define REQ_F_LINK             64      /* linked sqes */
-#define REQ_F_FAIL_LINK                128     /* fail rest of links */
+#define REQ_F_LINK_DONE                128     /* linked sqes done */
+#define REQ_F_FAIL_LINK                256     /* fail rest of links */
        u64                     user_data;
        u32                     result;
        u32                     sequence;
@@ -429,7 +430,7 @@ static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
        if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
                return false;
 
-       return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
+       return req->sequence != ctx->cached_cq_tail + ctx->sq_ring->dropped;
 }
 
 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
@@ -632,6 +633,7 @@ static void io_req_link_next(struct io_kiocb *req)
                        nxt->flags |= REQ_F_LINK;
                }
 
+               nxt->flags |= REQ_F_LINK_DONE;
                INIT_WORK(&nxt->work, io_sq_wq_submit_work);
                queue_work(req->ctx->sqo_wq, &nxt->work);
        }
@@ -677,6 +679,13 @@ static void io_put_req(struct io_kiocb *req)
                io_free_req(req);
 }
 
+static unsigned io_cqring_events(struct io_cq_ring *ring)
+{
+       /* See comment at the top of this file */
+       smp_rmb();
+       return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
+}
+
 /*
  * Find and free completed poll iocbs
  */
@@ -769,7 +778,7 @@ static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
                                long min)
 {
-       while (!list_empty(&ctx->poll_list)) {
+       while (!list_empty(&ctx->poll_list) && !need_resched()) {
                int ret;
 
                ret = io_do_iopoll(ctx, nr_events, min);
@@ -796,6 +805,12 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
                unsigned int nr_events = 0;
 
                io_iopoll_getevents(ctx, &nr_events, 1);
+
+               /*
+                * Ensure we allow local-to-the-cpu processing to take place,
+                * in this case we need to ensure that we reap all events.
+                */
+               cond_resched();
        }
        mutex_unlock(&ctx->uring_lock);
 }
@@ -803,11 +818,42 @@ static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
                           long min)
 {
-       int ret = 0;
+       int iters, ret = 0;
+
+       /*
+        * We disallow the app entering submit/complete with polling, but we
+        * still need to lock the ring to prevent racing with polled issue
+        * that got punted to a workqueue.
+        */
+       mutex_lock(&ctx->uring_lock);
 
+       iters = 0;
        do {
                int tmin = 0;
 
+               /*
+                * Don't enter poll loop if we already have events pending.
+                * If we do, we can potentially be spinning for commands that
+                * already triggered a CQE (eg in error).
+                */
+               if (io_cqring_events(ctx->cq_ring))
+                       break;
+
+               /*
+                * If a submit got punted to a workqueue, we can have the
+                * application entering polling for a command before it gets
+                * issued. That app will hold the uring_lock for the duration
+                * of the poll right here, so we need to take a breather every
+                * now and then to ensure that the issue has a chance to add
+                * the poll to the issued list. Otherwise we can spin here
+                * forever, while the workqueue is stuck trying to acquire the
+                * very same mutex.
+                */
+               if (!(++iters & 7)) {
+                       mutex_unlock(&ctx->uring_lock);
+                       mutex_lock(&ctx->uring_lock);
+               }
+
                if (*nr_events < min)
                        tmin = min - *nr_events;
 
@@ -817,6 +863,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
                ret = 0;
        } while (min && !*nr_events && !need_resched());
 
+       mutex_unlock(&ctx->uring_lock);
        return ret;
 }
 
@@ -1064,8 +1111,42 @@ static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
         */
        offset = buf_addr - imu->ubuf;
        iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
-       if (offset)
-               iov_iter_advance(iter, offset);
+
+       if (offset) {
+               /*
+                * Don't use iov_iter_advance() here, as it's really slow for
+                * using the latter parts of a big fixed buffer - it iterates
+                * over each segment manually. We can cheat a bit here, because
+                * we know that:
+                *
+                * 1) it's a BVEC iter, we set it up
+                * 2) all bvecs are PAGE_SIZE in size, except potentially the
+                *    first and last bvec
+                *
+                * So just find our index, and adjust the iterator afterwards.
+                * If the offset is within the first bvec (or the whole first
+                * bvec, just use iov_iter_advance(). This makes it easier
+                * since we can just skip the first segment, which may not
+                * be PAGE_SIZE aligned.
+                */
+               const struct bio_vec *bvec = imu->bvec;
+
+               if (offset <= bvec->bv_len) {
+                       iov_iter_advance(iter, offset);
+               } else {
+                       unsigned long seg_skip;
+
+                       /* skip first vec */
+                       offset -= bvec->bv_len;
+                       seg_skip = 1 + (offset >> PAGE_SHIFT);
+
+                       iter->bvec = bvec + seg_skip;
+                       iter->nr_segs -= seg_skip;
+                       iter->count -= bvec->bv_len + offset;
+                       iter->iov_offset = offset & ~PAGE_MASK;
+               }
+       }
+
        return 0;
 }
 
@@ -1120,28 +1201,26 @@ static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
        off_t io_end = kiocb->ki_pos + len;
 
        if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
-               unsigned long max_pages;
+               unsigned long max_bytes;
 
                /* Use 8x RA size as a decent limiter for both reads/writes */
-               max_pages = filp->f_ra.ra_pages;
-               if (!max_pages)
-                       max_pages = VM_READAHEAD_PAGES;
-               max_pages *= 8;
-
-               /* If max pages are exceeded, reset the state */
-               len >>= PAGE_SHIFT;
-               if (async_list->io_pages + len <= max_pages) {
+               max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
+               if (!max_bytes)
+                       max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
+
+               /* If max len are exceeded, reset the state */
+               if (async_list->io_len + len <= max_bytes) {
                        req->flags |= REQ_F_SEQ_PREV;
-                       async_list->io_pages += len;
+                       async_list->io_len += len;
                } else {
                        io_end = 0;
-                       async_list->io_pages = 0;
+                       async_list->io_len = 0;
                }
        }
 
        /* New file? Reset state. */
        if (async_list->file != filp) {
-               async_list->io_pages = 0;
+               async_list->io_len = 0;
                async_list->file = filp;
        }
        async_list->io_end = io_end;
@@ -1630,6 +1709,8 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
        INIT_LIST_HEAD(&poll->wait.entry);
        init_waitqueue_func_entry(&poll->wait, io_poll_wake);
 
+       INIT_LIST_HEAD(&req->list);
+
        mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
 
        spin_lock_irq(&ctx->completion_lock);
@@ -1800,6 +1881,7 @@ static void io_sq_wq_submit_work(struct work_struct *work)
        do {
                struct sqe_submit *s = &req->submit;
                const struct io_uring_sqe *sqe = s->sqe;
+               unsigned int flags = req->flags;
 
                /* Ensure we clear previously set non-block flag */
                req->rw.ki_flags &= ~IOCB_NOWAIT;
@@ -1844,6 +1926,10 @@ static void io_sq_wq_submit_work(struct work_struct *work)
                /* async context always use a copy of the sqe */
                kfree(sqe);
 
+               /* req from defer and link list needn't decrease async cnt */
+               if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
+                       goto out;
+
                if (!async_list)
                        break;
                if (!list_empty(&req_list)) {
@@ -1891,6 +1977,7 @@ static void io_sq_wq_submit_work(struct work_struct *work)
                }
        }
 
+out:
        if (cur_mm) {
                set_fs(old_fs);
                unuse_mm(cur_mm);
@@ -1917,6 +2004,10 @@ static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
        ret = true;
        spin_lock(&list->lock);
        list_add_tail(&req->list, &list->list);
+       /*
+        * Ensure we see a simultaneous modification from io_sq_wq_submit_work()
+        */
+       smp_mb();
        if (!atomic_read(&list->cnt)) {
                list_del_init(&req->list);
                ret = false;
@@ -1977,6 +2068,15 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 {
        int ret;
 
+       ret = io_req_defer(ctx, req, s->sqe);
+       if (ret) {
+               if (ret != -EIOCBQUEUED) {
+                       io_free_req(req);
+                       io_cqring_add_event(ctx, s->sqe->user_data, ret);
+               }
+               return 0;
+       }
+
        ret = __io_submit_sqe(ctx, req, s, true);
        if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
                struct io_uring_sqe *sqe_copy;
@@ -2049,13 +2149,6 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
                return;
        }
 
-       ret = io_req_defer(ctx, req, s->sqe);
-       if (ret) {
-               if (ret != -EIOCBQUEUED)
-                       goto err_req;
-               return;
-       }
-
        /*
         * If we already have a head request, queue this one for async
         * submittal once the head completes. If we don't have a head but
@@ -2232,15 +2325,7 @@ static int io_sq_thread(void *data)
                        unsigned nr_events = 0;
 
                        if (ctx->flags & IORING_SETUP_IOPOLL) {
-                               /*
-                                * We disallow the app entering submit/complete
-                                * with polling, but we still need to lock the
-                                * ring to prevent racing with polled issue
-                                * that got punted to a workqueue.
-                                */
-                               mutex_lock(&ctx->uring_lock);
                                io_iopoll_check(ctx, &nr_events, 0);
-                               mutex_unlock(&ctx->uring_lock);
                        } else {
                                /*
                                 * Normal IO, just pretend everything completed.
@@ -2385,13 +2470,6 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
        return submit;
 }
 
-static unsigned io_cqring_events(struct io_cq_ring *ring)
-{
-       /* See comment at the top of this file */
-       smp_rmb();
-       return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
-}
-
 /*
  * Wait until events become available, if we don't already have some. The
  * application must reap them itself, as they reside on the shared cq ring.
@@ -3142,9 +3220,7 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
                min_complete = min(min_complete, ctx->cq_entries);
 
                if (ctx->flags & IORING_SETUP_IOPOLL) {
-                       mutex_lock(&ctx->uring_lock);
                        ret = io_iopoll_check(ctx, &nr_events, min_complete);
-                       mutex_unlock(&ctx->uring_lock);
                } else {
                        ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
                }