]> asedeno.scripts.mit.edu Git - linux.git/blob - fs/io_uring.c
io_uring: add submission polling
[linux.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side. When the application reads the CQ ring
8  * tail, it must use an appropriate smp_rmb() to order with the smp_wmb()
9  * the kernel uses after writing the tail. Failure to do so could cause a
10  * delay in when the application notices that completion events available.
11  * This isn't a fatal condition. Likewise, the application must use an
12  * appropriate smp_wmb() both before writing the SQ tail, and after writing
13  * the SQ tail. The first one orders the sqe writes with the tail write, and
14  * the latter is paired with the smp_rmb() the kernel will issue before
15  * reading the SQ tail on submission.
16  *
17  * Also see the examples in the liburing library:
18  *
19  *      git://git.kernel.dk/liburing
20  *
21  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
22  * from data shared between the kernel and application. This is done both
23  * for ordering purposes, but also to ensure that once a value is loaded from
24  * data that the application could potentially modify, it remains stable.
25  *
26  * Copyright (C) 2018-2019 Jens Axboe
27  * Copyright (c) 2018-2019 Christoph Hellwig
28  */
29 #include <linux/kernel.h>
30 #include <linux/init.h>
31 #include <linux/errno.h>
32 #include <linux/syscalls.h>
33 #include <linux/compat.h>
34 #include <linux/refcount.h>
35 #include <linux/uio.h>
36
37 #include <linux/sched/signal.h>
38 #include <linux/fs.h>
39 #include <linux/file.h>
40 #include <linux/fdtable.h>
41 #include <linux/mm.h>
42 #include <linux/mman.h>
43 #include <linux/mmu_context.h>
44 #include <linux/percpu.h>
45 #include <linux/slab.h>
46 #include <linux/workqueue.h>
47 #include <linux/kthread.h>
48 #include <linux/blkdev.h>
49 #include <linux/bvec.h>
50 #include <linux/net.h>
51 #include <net/sock.h>
52 #include <net/af_unix.h>
53 #include <net/scm.h>
54 #include <linux/anon_inodes.h>
55 #include <linux/sched/mm.h>
56 #include <linux/uaccess.h>
57 #include <linux/nospec.h>
58 #include <linux/sizes.h>
59 #include <linux/hugetlb.h>
60
61 #include <uapi/linux/io_uring.h>
62
63 #include "internal.h"
64
65 #define IORING_MAX_ENTRIES      4096
66 #define IORING_MAX_FIXED_FILES  1024
67
68 struct io_uring {
69         u32 head ____cacheline_aligned_in_smp;
70         u32 tail ____cacheline_aligned_in_smp;
71 };
72
73 struct io_sq_ring {
74         struct io_uring         r;
75         u32                     ring_mask;
76         u32                     ring_entries;
77         u32                     dropped;
78         u32                     flags;
79         u32                     array[];
80 };
81
82 struct io_cq_ring {
83         struct io_uring         r;
84         u32                     ring_mask;
85         u32                     ring_entries;
86         u32                     overflow;
87         struct io_uring_cqe     cqes[];
88 };
89
90 struct io_mapped_ubuf {
91         u64             ubuf;
92         size_t          len;
93         struct          bio_vec *bvec;
94         unsigned int    nr_bvecs;
95 };
96
97 struct io_ring_ctx {
98         struct {
99                 struct percpu_ref       refs;
100         } ____cacheline_aligned_in_smp;
101
102         struct {
103                 unsigned int            flags;
104                 bool                    compat;
105                 bool                    account_mem;
106
107                 /* SQ ring */
108                 struct io_sq_ring       *sq_ring;
109                 unsigned                cached_sq_head;
110                 unsigned                sq_entries;
111                 unsigned                sq_mask;
112                 unsigned                sq_thread_idle;
113                 struct io_uring_sqe     *sq_sqes;
114         } ____cacheline_aligned_in_smp;
115
116         /* IO offload */
117         struct workqueue_struct *sqo_wq;
118         struct task_struct      *sqo_thread;    /* if using sq thread polling */
119         struct mm_struct        *sqo_mm;
120         wait_queue_head_t       sqo_wait;
121         unsigned                sqo_stop;
122
123         struct {
124                 /* CQ ring */
125                 struct io_cq_ring       *cq_ring;
126                 unsigned                cached_cq_tail;
127                 unsigned                cq_entries;
128                 unsigned                cq_mask;
129                 struct wait_queue_head  cq_wait;
130                 struct fasync_struct    *cq_fasync;
131         } ____cacheline_aligned_in_smp;
132
133         /*
134          * If used, fixed file set. Writers must ensure that ->refs is dead,
135          * readers must ensure that ->refs is alive as long as the file* is
136          * used. Only updated through io_uring_register(2).
137          */
138         struct file             **user_files;
139         unsigned                nr_user_files;
140
141         /* if used, fixed mapped user buffers */
142         unsigned                nr_user_bufs;
143         struct io_mapped_ubuf   *user_bufs;
144
145         struct user_struct      *user;
146
147         struct completion       ctx_done;
148
149         struct {
150                 struct mutex            uring_lock;
151                 wait_queue_head_t       wait;
152         } ____cacheline_aligned_in_smp;
153
154         struct {
155                 spinlock_t              completion_lock;
156                 bool                    poll_multi_file;
157                 /*
158                  * ->poll_list is protected by the ctx->uring_lock for
159                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
160                  * For SQPOLL, only the single threaded io_sq_thread() will
161                  * manipulate the list, hence no extra locking is needed there.
162                  */
163                 struct list_head        poll_list;
164         } ____cacheline_aligned_in_smp;
165
166 #if defined(CONFIG_UNIX)
167         struct socket           *ring_sock;
168 #endif
169 };
170
171 struct sqe_submit {
172         const struct io_uring_sqe       *sqe;
173         unsigned short                  index;
174         bool                            has_user;
175         bool                            needs_lock;
176         bool                            needs_fixed_file;
177 };
178
179 struct io_kiocb {
180         struct kiocb            rw;
181
182         struct sqe_submit       submit;
183
184         struct io_ring_ctx      *ctx;
185         struct list_head        list;
186         unsigned int            flags;
187 #define REQ_F_FORCE_NONBLOCK    1       /* inline submission attempt */
188 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
189 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
190         u64                     user_data;
191         u64                     error;
192
193         struct work_struct      work;
194 };
195
196 #define IO_PLUG_THRESHOLD               2
197 #define IO_IOPOLL_BATCH                 8
198
199 struct io_submit_state {
200         struct blk_plug         plug;
201
202         /*
203          * io_kiocb alloc cache
204          */
205         void                    *reqs[IO_IOPOLL_BATCH];
206         unsigned                int free_reqs;
207         unsigned                int cur_req;
208
209         /*
210          * File reference cache
211          */
212         struct file             *file;
213         unsigned int            fd;
214         unsigned int            has_refs;
215         unsigned int            used_refs;
216         unsigned int            ios_left;
217 };
218
219 static struct kmem_cache *req_cachep;
220
221 static const struct file_operations io_uring_fops;
222
223 struct sock *io_uring_get_socket(struct file *file)
224 {
225 #if defined(CONFIG_UNIX)
226         if (file->f_op == &io_uring_fops) {
227                 struct io_ring_ctx *ctx = file->private_data;
228
229                 return ctx->ring_sock->sk;
230         }
231 #endif
232         return NULL;
233 }
234 EXPORT_SYMBOL(io_uring_get_socket);
235
236 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
237 {
238         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
239
240         complete(&ctx->ctx_done);
241 }
242
243 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
244 {
245         struct io_ring_ctx *ctx;
246
247         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
248         if (!ctx)
249                 return NULL;
250
251         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
252                 kfree(ctx);
253                 return NULL;
254         }
255
256         ctx->flags = p->flags;
257         init_waitqueue_head(&ctx->cq_wait);
258         init_completion(&ctx->ctx_done);
259         mutex_init(&ctx->uring_lock);
260         init_waitqueue_head(&ctx->wait);
261         spin_lock_init(&ctx->completion_lock);
262         INIT_LIST_HEAD(&ctx->poll_list);
263         return ctx;
264 }
265
266 static void io_commit_cqring(struct io_ring_ctx *ctx)
267 {
268         struct io_cq_ring *ring = ctx->cq_ring;
269
270         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
271                 /* order cqe stores with ring update */
272                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
273
274                 /*
275                  * Write sider barrier of tail update, app has read side. See
276                  * comment at the top of this file.
277                  */
278                 smp_wmb();
279
280                 if (wq_has_sleeper(&ctx->cq_wait)) {
281                         wake_up_interruptible(&ctx->cq_wait);
282                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
283                 }
284         }
285 }
286
287 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
288 {
289         struct io_cq_ring *ring = ctx->cq_ring;
290         unsigned tail;
291
292         tail = ctx->cached_cq_tail;
293         /* See comment at the top of the file */
294         smp_rmb();
295         if (tail + 1 == READ_ONCE(ring->r.head))
296                 return NULL;
297
298         ctx->cached_cq_tail++;
299         return &ring->cqes[tail & ctx->cq_mask];
300 }
301
302 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
303                                  long res, unsigned ev_flags)
304 {
305         struct io_uring_cqe *cqe;
306
307         /*
308          * If we can't get a cq entry, userspace overflowed the
309          * submission (by quite a lot). Increment the overflow count in
310          * the ring.
311          */
312         cqe = io_get_cqring(ctx);
313         if (cqe) {
314                 WRITE_ONCE(cqe->user_data, ki_user_data);
315                 WRITE_ONCE(cqe->res, res);
316                 WRITE_ONCE(cqe->flags, ev_flags);
317         } else {
318                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
319
320                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
321         }
322 }
323
324 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 ki_user_data,
325                                 long res, unsigned ev_flags)
326 {
327         unsigned long flags;
328
329         spin_lock_irqsave(&ctx->completion_lock, flags);
330         io_cqring_fill_event(ctx, ki_user_data, res, ev_flags);
331         io_commit_cqring(ctx);
332         spin_unlock_irqrestore(&ctx->completion_lock, flags);
333
334         if (waitqueue_active(&ctx->wait))
335                 wake_up(&ctx->wait);
336         if (waitqueue_active(&ctx->sqo_wait))
337                 wake_up(&ctx->sqo_wait);
338 }
339
340 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
341 {
342         percpu_ref_put_many(&ctx->refs, refs);
343
344         if (waitqueue_active(&ctx->wait))
345                 wake_up(&ctx->wait);
346 }
347
348 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
349                                    struct io_submit_state *state)
350 {
351         struct io_kiocb *req;
352
353         if (!percpu_ref_tryget(&ctx->refs))
354                 return NULL;
355
356         if (!state) {
357                 req = kmem_cache_alloc(req_cachep, __GFP_NOWARN);
358                 if (unlikely(!req))
359                         goto out;
360         } else if (!state->free_reqs) {
361                 size_t sz;
362                 int ret;
363
364                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
365                 ret = kmem_cache_alloc_bulk(req_cachep, __GFP_NOWARN, sz,
366                                                 state->reqs);
367                 if (unlikely(ret <= 0))
368                         goto out;
369                 state->free_reqs = ret - 1;
370                 state->cur_req = 1;
371                 req = state->reqs[0];
372         } else {
373                 req = state->reqs[state->cur_req];
374                 state->free_reqs--;
375                 state->cur_req++;
376         }
377
378         req->ctx = ctx;
379         req->flags = 0;
380         return req;
381 out:
382         io_ring_drop_ctx_refs(ctx, 1);
383         return NULL;
384 }
385
386 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
387 {
388         if (*nr) {
389                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
390                 io_ring_drop_ctx_refs(ctx, *nr);
391                 *nr = 0;
392         }
393 }
394
395 static void io_free_req(struct io_kiocb *req)
396 {
397         io_ring_drop_ctx_refs(req->ctx, 1);
398         kmem_cache_free(req_cachep, req);
399 }
400
401 /*
402  * Find and free completed poll iocbs
403  */
404 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
405                                struct list_head *done)
406 {
407         void *reqs[IO_IOPOLL_BATCH];
408         int file_count, to_free;
409         struct file *file = NULL;
410         struct io_kiocb *req;
411
412         file_count = to_free = 0;
413         while (!list_empty(done)) {
414                 req = list_first_entry(done, struct io_kiocb, list);
415                 list_del(&req->list);
416
417                 io_cqring_fill_event(ctx, req->user_data, req->error, 0);
418
419                 reqs[to_free++] = req;
420                 (*nr_events)++;
421
422                 /*
423                  * Batched puts of the same file, to avoid dirtying the
424                  * file usage count multiple times, if avoidable.
425                  */
426                 if (!(req->flags & REQ_F_FIXED_FILE)) {
427                         if (!file) {
428                                 file = req->rw.ki_filp;
429                                 file_count = 1;
430                         } else if (file == req->rw.ki_filp) {
431                                 file_count++;
432                         } else {
433                                 fput_many(file, file_count);
434                                 file = req->rw.ki_filp;
435                                 file_count = 1;
436                         }
437                 }
438
439                 if (to_free == ARRAY_SIZE(reqs))
440                         io_free_req_many(ctx, reqs, &to_free);
441         }
442         io_commit_cqring(ctx);
443
444         if (file)
445                 fput_many(file, file_count);
446         io_free_req_many(ctx, reqs, &to_free);
447 }
448
449 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
450                         long min)
451 {
452         struct io_kiocb *req, *tmp;
453         LIST_HEAD(done);
454         bool spin;
455         int ret;
456
457         /*
458          * Only spin for completions if we don't have multiple devices hanging
459          * off our complete list, and we're under the requested amount.
460          */
461         spin = !ctx->poll_multi_file && *nr_events < min;
462
463         ret = 0;
464         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
465                 struct kiocb *kiocb = &req->rw;
466
467                 /*
468                  * Move completed entries to our local list. If we find a
469                  * request that requires polling, break out and complete
470                  * the done list first, if we have entries there.
471                  */
472                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
473                         list_move_tail(&req->list, &done);
474                         continue;
475                 }
476                 if (!list_empty(&done))
477                         break;
478
479                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
480                 if (ret < 0)
481                         break;
482
483                 if (ret && spin)
484                         spin = false;
485                 ret = 0;
486         }
487
488         if (!list_empty(&done))
489                 io_iopoll_complete(ctx, nr_events, &done);
490
491         return ret;
492 }
493
494 /*
495  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
496  * non-spinning poll check - we'll still enter the driver poll loop, but only
497  * as a non-spinning completion check.
498  */
499 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
500                                 long min)
501 {
502         while (!list_empty(&ctx->poll_list)) {
503                 int ret;
504
505                 ret = io_do_iopoll(ctx, nr_events, min);
506                 if (ret < 0)
507                         return ret;
508                 if (!min || *nr_events >= min)
509                         return 0;
510         }
511
512         return 1;
513 }
514
515 /*
516  * We can't just wait for polled events to come to us, we have to actively
517  * find and complete them.
518  */
519 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
520 {
521         if (!(ctx->flags & IORING_SETUP_IOPOLL))
522                 return;
523
524         mutex_lock(&ctx->uring_lock);
525         while (!list_empty(&ctx->poll_list)) {
526                 unsigned int nr_events = 0;
527
528                 io_iopoll_getevents(ctx, &nr_events, 1);
529         }
530         mutex_unlock(&ctx->uring_lock);
531 }
532
533 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
534                            long min)
535 {
536         int ret = 0;
537
538         do {
539                 int tmin = 0;
540
541                 if (*nr_events < min)
542                         tmin = min - *nr_events;
543
544                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
545                 if (ret <= 0)
546                         break;
547                 ret = 0;
548         } while (min && !*nr_events && !need_resched());
549
550         return ret;
551 }
552
553 static void kiocb_end_write(struct kiocb *kiocb)
554 {
555         if (kiocb->ki_flags & IOCB_WRITE) {
556                 struct inode *inode = file_inode(kiocb->ki_filp);
557
558                 /*
559                  * Tell lockdep we inherited freeze protection from submission
560                  * thread.
561                  */
562                 if (S_ISREG(inode->i_mode))
563                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
564                 file_end_write(kiocb->ki_filp);
565         }
566 }
567
568 static void io_fput(struct io_kiocb *req)
569 {
570         if (!(req->flags & REQ_F_FIXED_FILE))
571                 fput(req->rw.ki_filp);
572 }
573
574 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
575 {
576         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
577
578         kiocb_end_write(kiocb);
579
580         io_fput(req);
581         io_cqring_add_event(req->ctx, req->user_data, res, 0);
582         io_free_req(req);
583 }
584
585 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
586 {
587         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
588
589         kiocb_end_write(kiocb);
590
591         req->error = res;
592         if (res != -EAGAIN)
593                 req->flags |= REQ_F_IOPOLL_COMPLETED;
594 }
595
596 /*
597  * After the iocb has been issued, it's safe to be found on the poll list.
598  * Adding the kiocb to the list AFTER submission ensures that we don't
599  * find it from a io_iopoll_getevents() thread before the issuer is done
600  * accessing the kiocb cookie.
601  */
602 static void io_iopoll_req_issued(struct io_kiocb *req)
603 {
604         struct io_ring_ctx *ctx = req->ctx;
605
606         /*
607          * Track whether we have multiple files in our lists. This will impact
608          * how we do polling eventually, not spinning if we're on potentially
609          * different devices.
610          */
611         if (list_empty(&ctx->poll_list)) {
612                 ctx->poll_multi_file = false;
613         } else if (!ctx->poll_multi_file) {
614                 struct io_kiocb *list_req;
615
616                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
617                                                 list);
618                 if (list_req->rw.ki_filp != req->rw.ki_filp)
619                         ctx->poll_multi_file = true;
620         }
621
622         /*
623          * For fast devices, IO may have already completed. If it has, add
624          * it to the front so we find it first.
625          */
626         if (req->flags & REQ_F_IOPOLL_COMPLETED)
627                 list_add(&req->list, &ctx->poll_list);
628         else
629                 list_add_tail(&req->list, &ctx->poll_list);
630 }
631
632 static void io_file_put(struct io_submit_state *state, struct file *file)
633 {
634         if (!state) {
635                 fput(file);
636         } else if (state->file) {
637                 int diff = state->has_refs - state->used_refs;
638
639                 if (diff)
640                         fput_many(state->file, diff);
641                 state->file = NULL;
642         }
643 }
644
645 /*
646  * Get as many references to a file as we have IOs left in this submission,
647  * assuming most submissions are for one file, or at least that each file
648  * has more than one submission.
649  */
650 static struct file *io_file_get(struct io_submit_state *state, int fd)
651 {
652         if (!state)
653                 return fget(fd);
654
655         if (state->file) {
656                 if (state->fd == fd) {
657                         state->used_refs++;
658                         state->ios_left--;
659                         return state->file;
660                 }
661                 io_file_put(state, NULL);
662         }
663         state->file = fget_many(fd, state->ios_left);
664         if (!state->file)
665                 return NULL;
666
667         state->fd = fd;
668         state->has_refs = state->ios_left;
669         state->used_refs = 1;
670         state->ios_left--;
671         return state->file;
672 }
673
674 /*
675  * If we tracked the file through the SCM inflight mechanism, we could support
676  * any file. For now, just ensure that anything potentially problematic is done
677  * inline.
678  */
679 static bool io_file_supports_async(struct file *file)
680 {
681         umode_t mode = file_inode(file)->i_mode;
682
683         if (S_ISBLK(mode) || S_ISCHR(mode))
684                 return true;
685         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
686                 return true;
687
688         return false;
689 }
690
691 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
692                       bool force_nonblock, struct io_submit_state *state)
693 {
694         const struct io_uring_sqe *sqe = s->sqe;
695         struct io_ring_ctx *ctx = req->ctx;
696         struct kiocb *kiocb = &req->rw;
697         unsigned ioprio, flags;
698         int fd, ret;
699
700         /* For -EAGAIN retry, everything is already prepped */
701         if (kiocb->ki_filp)
702                 return 0;
703
704         flags = READ_ONCE(sqe->flags);
705         fd = READ_ONCE(sqe->fd);
706
707         if (flags & IOSQE_FIXED_FILE) {
708                 if (unlikely(!ctx->user_files ||
709                     (unsigned) fd >= ctx->nr_user_files))
710                         return -EBADF;
711                 kiocb->ki_filp = ctx->user_files[fd];
712                 req->flags |= REQ_F_FIXED_FILE;
713         } else {
714                 if (s->needs_fixed_file)
715                         return -EBADF;
716                 kiocb->ki_filp = io_file_get(state, fd);
717                 if (unlikely(!kiocb->ki_filp))
718                         return -EBADF;
719                 if (force_nonblock && !io_file_supports_async(kiocb->ki_filp))
720                         force_nonblock = false;
721         }
722         kiocb->ki_pos = READ_ONCE(sqe->off);
723         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
724         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
725
726         ioprio = READ_ONCE(sqe->ioprio);
727         if (ioprio) {
728                 ret = ioprio_check_cap(ioprio);
729                 if (ret)
730                         goto out_fput;
731
732                 kiocb->ki_ioprio = ioprio;
733         } else
734                 kiocb->ki_ioprio = get_current_ioprio();
735
736         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
737         if (unlikely(ret))
738                 goto out_fput;
739         if (force_nonblock) {
740                 kiocb->ki_flags |= IOCB_NOWAIT;
741                 req->flags |= REQ_F_FORCE_NONBLOCK;
742         }
743         if (ctx->flags & IORING_SETUP_IOPOLL) {
744                 ret = -EOPNOTSUPP;
745                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
746                     !kiocb->ki_filp->f_op->iopoll)
747                         goto out_fput;
748
749                 req->error = 0;
750                 kiocb->ki_flags |= IOCB_HIPRI;
751                 kiocb->ki_complete = io_complete_rw_iopoll;
752         } else {
753                 if (kiocb->ki_flags & IOCB_HIPRI) {
754                         ret = -EINVAL;
755                         goto out_fput;
756                 }
757                 kiocb->ki_complete = io_complete_rw;
758         }
759         return 0;
760 out_fput:
761         if (!(flags & IOSQE_FIXED_FILE)) {
762                 /*
763                  * in case of error, we didn't use this file reference. drop it.
764                  */
765                 if (state)
766                         state->used_refs--;
767                 io_file_put(state, kiocb->ki_filp);
768         }
769         return ret;
770 }
771
772 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
773 {
774         switch (ret) {
775         case -EIOCBQUEUED:
776                 break;
777         case -ERESTARTSYS:
778         case -ERESTARTNOINTR:
779         case -ERESTARTNOHAND:
780         case -ERESTART_RESTARTBLOCK:
781                 /*
782                  * We can't just restart the syscall, since previously
783                  * submitted sqes may already be in progress. Just fail this
784                  * IO with EINTR.
785                  */
786                 ret = -EINTR;
787                 /* fall through */
788         default:
789                 kiocb->ki_complete(kiocb, ret, 0);
790         }
791 }
792
793 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
794                            const struct io_uring_sqe *sqe,
795                            struct iov_iter *iter)
796 {
797         size_t len = READ_ONCE(sqe->len);
798         struct io_mapped_ubuf *imu;
799         unsigned index, buf_index;
800         size_t offset;
801         u64 buf_addr;
802
803         /* attempt to use fixed buffers without having provided iovecs */
804         if (unlikely(!ctx->user_bufs))
805                 return -EFAULT;
806
807         buf_index = READ_ONCE(sqe->buf_index);
808         if (unlikely(buf_index >= ctx->nr_user_bufs))
809                 return -EFAULT;
810
811         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
812         imu = &ctx->user_bufs[index];
813         buf_addr = READ_ONCE(sqe->addr);
814
815         /* overflow */
816         if (buf_addr + len < buf_addr)
817                 return -EFAULT;
818         /* not inside the mapped region */
819         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
820                 return -EFAULT;
821
822         /*
823          * May not be a start of buffer, set size appropriately
824          * and advance us to the beginning.
825          */
826         offset = buf_addr - imu->ubuf;
827         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
828         if (offset)
829                 iov_iter_advance(iter, offset);
830         return 0;
831 }
832
833 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
834                            const struct sqe_submit *s, struct iovec **iovec,
835                            struct iov_iter *iter)
836 {
837         const struct io_uring_sqe *sqe = s->sqe;
838         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
839         size_t sqe_len = READ_ONCE(sqe->len);
840         u8 opcode;
841
842         /*
843          * We're reading ->opcode for the second time, but the first read
844          * doesn't care whether it's _FIXED or not, so it doesn't matter
845          * whether ->opcode changes concurrently. The first read does care
846          * about whether it is a READ or a WRITE, so we don't trust this read
847          * for that purpose and instead let the caller pass in the read/write
848          * flag.
849          */
850         opcode = READ_ONCE(sqe->opcode);
851         if (opcode == IORING_OP_READ_FIXED ||
852             opcode == IORING_OP_WRITE_FIXED) {
853                 ssize_t ret = io_import_fixed(ctx, rw, sqe, iter);
854                 *iovec = NULL;
855                 return ret;
856         }
857
858         if (!s->has_user)
859                 return -EFAULT;
860
861 #ifdef CONFIG_COMPAT
862         if (ctx->compat)
863                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
864                                                 iovec, iter);
865 #endif
866
867         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
868 }
869
870 static ssize_t io_read(struct io_kiocb *req, const struct sqe_submit *s,
871                        bool force_nonblock, struct io_submit_state *state)
872 {
873         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
874         struct kiocb *kiocb = &req->rw;
875         struct iov_iter iter;
876         struct file *file;
877         ssize_t ret;
878
879         ret = io_prep_rw(req, s, force_nonblock, state);
880         if (ret)
881                 return ret;
882         file = kiocb->ki_filp;
883
884         ret = -EBADF;
885         if (unlikely(!(file->f_mode & FMODE_READ)))
886                 goto out_fput;
887         ret = -EINVAL;
888         if (unlikely(!file->f_op->read_iter))
889                 goto out_fput;
890
891         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
892         if (ret)
893                 goto out_fput;
894
895         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_iter_count(&iter));
896         if (!ret) {
897                 ssize_t ret2;
898
899                 /* Catch -EAGAIN return for forced non-blocking submission */
900                 ret2 = call_read_iter(file, kiocb, &iter);
901                 if (!force_nonblock || ret2 != -EAGAIN)
902                         io_rw_done(kiocb, ret2);
903                 else
904                         ret = -EAGAIN;
905         }
906         kfree(iovec);
907 out_fput:
908         /* Hold on to the file for -EAGAIN */
909         if (unlikely(ret && ret != -EAGAIN))
910                 io_fput(req);
911         return ret;
912 }
913
914 static ssize_t io_write(struct io_kiocb *req, const struct sqe_submit *s,
915                         bool force_nonblock, struct io_submit_state *state)
916 {
917         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
918         struct kiocb *kiocb = &req->rw;
919         struct iov_iter iter;
920         struct file *file;
921         ssize_t ret;
922
923         ret = io_prep_rw(req, s, force_nonblock, state);
924         if (ret)
925                 return ret;
926         /* Hold on to the file for -EAGAIN */
927         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
928                 return -EAGAIN;
929
930         ret = -EBADF;
931         file = kiocb->ki_filp;
932         if (unlikely(!(file->f_mode & FMODE_WRITE)))
933                 goto out_fput;
934         ret = -EINVAL;
935         if (unlikely(!file->f_op->write_iter))
936                 goto out_fput;
937
938         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
939         if (ret)
940                 goto out_fput;
941
942         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos,
943                                 iov_iter_count(&iter));
944         if (!ret) {
945                 /*
946                  * Open-code file_start_write here to grab freeze protection,
947                  * which will be released by another thread in
948                  * io_complete_rw().  Fool lockdep by telling it the lock got
949                  * released so that it doesn't complain about the held lock when
950                  * we return to userspace.
951                  */
952                 if (S_ISREG(file_inode(file)->i_mode)) {
953                         __sb_start_write(file_inode(file)->i_sb,
954                                                 SB_FREEZE_WRITE, true);
955                         __sb_writers_release(file_inode(file)->i_sb,
956                                                 SB_FREEZE_WRITE);
957                 }
958                 kiocb->ki_flags |= IOCB_WRITE;
959                 io_rw_done(kiocb, call_write_iter(file, kiocb, &iter));
960         }
961         kfree(iovec);
962 out_fput:
963         if (unlikely(ret))
964                 io_fput(req);
965         return ret;
966 }
967
968 /*
969  * IORING_OP_NOP just posts a completion event, nothing else.
970  */
971 static int io_nop(struct io_kiocb *req, u64 user_data)
972 {
973         struct io_ring_ctx *ctx = req->ctx;
974         long err = 0;
975
976         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
977                 return -EINVAL;
978
979         /*
980          * Twilight zone - it's possible that someone issued an opcode that
981          * has a file attached, then got -EAGAIN on submission, and changed
982          * the sqe before we retried it from async context. Avoid dropping
983          * a file reference for this malicious case, and flag the error.
984          */
985         if (req->rw.ki_filp) {
986                 err = -EBADF;
987                 io_fput(req);
988         }
989         io_cqring_add_event(ctx, user_data, err, 0);
990         io_free_req(req);
991         return 0;
992 }
993
994 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
995 {
996         struct io_ring_ctx *ctx = req->ctx;
997         unsigned flags;
998         int fd;
999
1000         /* Prep already done */
1001         if (req->rw.ki_filp)
1002                 return 0;
1003
1004         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1005                 return -EINVAL;
1006         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1007                 return -EINVAL;
1008
1009         fd = READ_ONCE(sqe->fd);
1010         flags = READ_ONCE(sqe->flags);
1011
1012         if (flags & IOSQE_FIXED_FILE) {
1013                 if (unlikely(!ctx->user_files || fd >= ctx->nr_user_files))
1014                         return -EBADF;
1015                 req->rw.ki_filp = ctx->user_files[fd];
1016                 req->flags |= REQ_F_FIXED_FILE;
1017         } else {
1018                 req->rw.ki_filp = fget(fd);
1019                 if (unlikely(!req->rw.ki_filp))
1020                         return -EBADF;
1021         }
1022
1023         return 0;
1024 }
1025
1026 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1027                     bool force_nonblock)
1028 {
1029         loff_t sqe_off = READ_ONCE(sqe->off);
1030         loff_t sqe_len = READ_ONCE(sqe->len);
1031         loff_t end = sqe_off + sqe_len;
1032         unsigned fsync_flags;
1033         int ret;
1034
1035         fsync_flags = READ_ONCE(sqe->fsync_flags);
1036         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1037                 return -EINVAL;
1038
1039         ret = io_prep_fsync(req, sqe);
1040         if (ret)
1041                 return ret;
1042
1043         /* fsync always requires a blocking context */
1044         if (force_nonblock)
1045                 return -EAGAIN;
1046
1047         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1048                                 end > 0 ? end : LLONG_MAX,
1049                                 fsync_flags & IORING_FSYNC_DATASYNC);
1050
1051         io_fput(req);
1052         io_cqring_add_event(req->ctx, sqe->user_data, ret, 0);
1053         io_free_req(req);
1054         return 0;
1055 }
1056
1057 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1058                            const struct sqe_submit *s, bool force_nonblock,
1059                            struct io_submit_state *state)
1060 {
1061         ssize_t ret;
1062         int opcode;
1063
1064         if (unlikely(s->index >= ctx->sq_entries))
1065                 return -EINVAL;
1066         req->user_data = READ_ONCE(s->sqe->user_data);
1067
1068         opcode = READ_ONCE(s->sqe->opcode);
1069         switch (opcode) {
1070         case IORING_OP_NOP:
1071                 ret = io_nop(req, req->user_data);
1072                 break;
1073         case IORING_OP_READV:
1074                 if (unlikely(s->sqe->buf_index))
1075                         return -EINVAL;
1076                 ret = io_read(req, s, force_nonblock, state);
1077                 break;
1078         case IORING_OP_WRITEV:
1079                 if (unlikely(s->sqe->buf_index))
1080                         return -EINVAL;
1081                 ret = io_write(req, s, force_nonblock, state);
1082                 break;
1083         case IORING_OP_READ_FIXED:
1084                 ret = io_read(req, s, force_nonblock, state);
1085                 break;
1086         case IORING_OP_WRITE_FIXED:
1087                 ret = io_write(req, s, force_nonblock, state);
1088                 break;
1089         case IORING_OP_FSYNC:
1090                 ret = io_fsync(req, s->sqe, force_nonblock);
1091                 break;
1092         default:
1093                 ret = -EINVAL;
1094                 break;
1095         }
1096
1097         if (ret)
1098                 return ret;
1099
1100         if (ctx->flags & IORING_SETUP_IOPOLL) {
1101                 if (req->error == -EAGAIN)
1102                         return -EAGAIN;
1103
1104                 /* workqueue context doesn't hold uring_lock, grab it now */
1105                 if (s->needs_lock)
1106                         mutex_lock(&ctx->uring_lock);
1107                 io_iopoll_req_issued(req);
1108                 if (s->needs_lock)
1109                         mutex_unlock(&ctx->uring_lock);
1110         }
1111
1112         return 0;
1113 }
1114
1115 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1116 {
1117         u8 opcode = READ_ONCE(sqe->opcode);
1118
1119         return !(opcode == IORING_OP_READ_FIXED ||
1120                  opcode == IORING_OP_WRITE_FIXED);
1121 }
1122
1123 static void io_sq_wq_submit_work(struct work_struct *work)
1124 {
1125         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1126         struct sqe_submit *s = &req->submit;
1127         const struct io_uring_sqe *sqe = s->sqe;
1128         struct io_ring_ctx *ctx = req->ctx;
1129         mm_segment_t old_fs;
1130         bool needs_user;
1131         int ret;
1132
1133          /* Ensure we clear previously set forced non-block flag */
1134         req->flags &= ~REQ_F_FORCE_NONBLOCK;
1135         req->rw.ki_flags &= ~IOCB_NOWAIT;
1136
1137         s->needs_lock = true;
1138         s->has_user = false;
1139
1140         /*
1141          * If we're doing IO to fixed buffers, we don't need to get/set
1142          * user context
1143          */
1144         needs_user = io_sqe_needs_user(s->sqe);
1145         if (needs_user) {
1146                 if (!mmget_not_zero(ctx->sqo_mm)) {
1147                         ret = -EFAULT;
1148                         goto err;
1149                 }
1150                 use_mm(ctx->sqo_mm);
1151                 old_fs = get_fs();
1152                 set_fs(USER_DS);
1153                 s->has_user = true;
1154         }
1155
1156         do {
1157                 ret = __io_submit_sqe(ctx, req, s, false, NULL);
1158                 /*
1159                  * We can get EAGAIN for polled IO even though we're forcing
1160                  * a sync submission from here, since we can't wait for
1161                  * request slots on the block side.
1162                  */
1163                 if (ret != -EAGAIN)
1164                         break;
1165                 cond_resched();
1166         } while (1);
1167
1168         if (needs_user) {
1169                 set_fs(old_fs);
1170                 unuse_mm(ctx->sqo_mm);
1171                 mmput(ctx->sqo_mm);
1172         }
1173 err:
1174         if (ret) {
1175                 io_cqring_add_event(ctx, sqe->user_data, ret, 0);
1176                 io_free_req(req);
1177         }
1178
1179         /* async context always use a copy of the sqe */
1180         kfree(sqe);
1181 }
1182
1183 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1184                          struct io_submit_state *state)
1185 {
1186         struct io_kiocb *req;
1187         ssize_t ret;
1188
1189         /* enforce forwards compatibility on users */
1190         if (unlikely(s->sqe->flags & ~IOSQE_FIXED_FILE))
1191                 return -EINVAL;
1192
1193         req = io_get_req(ctx, state);
1194         if (unlikely(!req))
1195                 return -EAGAIN;
1196
1197         req->rw.ki_filp = NULL;
1198
1199         ret = __io_submit_sqe(ctx, req, s, true, state);
1200         if (ret == -EAGAIN) {
1201                 struct io_uring_sqe *sqe_copy;
1202
1203                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1204                 if (sqe_copy) {
1205                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1206                         s->sqe = sqe_copy;
1207
1208                         memcpy(&req->submit, s, sizeof(*s));
1209                         INIT_WORK(&req->work, io_sq_wq_submit_work);
1210                         queue_work(ctx->sqo_wq, &req->work);
1211                         ret = 0;
1212                 }
1213         }
1214         if (ret)
1215                 io_free_req(req);
1216
1217         return ret;
1218 }
1219
1220 /*
1221  * Batched submission is done, ensure local IO is flushed out.
1222  */
1223 static void io_submit_state_end(struct io_submit_state *state)
1224 {
1225         blk_finish_plug(&state->plug);
1226         io_file_put(state, NULL);
1227         if (state->free_reqs)
1228                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1229                                         &state->reqs[state->cur_req]);
1230 }
1231
1232 /*
1233  * Start submission side cache.
1234  */
1235 static void io_submit_state_start(struct io_submit_state *state,
1236                                   struct io_ring_ctx *ctx, unsigned max_ios)
1237 {
1238         blk_start_plug(&state->plug);
1239         state->free_reqs = 0;
1240         state->file = NULL;
1241         state->ios_left = max_ios;
1242 }
1243
1244 static void io_commit_sqring(struct io_ring_ctx *ctx)
1245 {
1246         struct io_sq_ring *ring = ctx->sq_ring;
1247
1248         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1249                 /*
1250                  * Ensure any loads from the SQEs are done at this point,
1251                  * since once we write the new head, the application could
1252                  * write new data to them.
1253                  */
1254                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1255
1256                 /*
1257                  * write side barrier of head update, app has read side. See
1258                  * comment at the top of this file
1259                  */
1260                 smp_wmb();
1261         }
1262 }
1263
1264 /*
1265  * Undo last io_get_sqring()
1266  */
1267 static void io_drop_sqring(struct io_ring_ctx *ctx)
1268 {
1269         ctx->cached_sq_head--;
1270 }
1271
1272 /*
1273  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1274  * that is mapped by userspace. This means that care needs to be taken to
1275  * ensure that reads are stable, as we cannot rely on userspace always
1276  * being a good citizen. If members of the sqe are validated and then later
1277  * used, it's important that those reads are done through READ_ONCE() to
1278  * prevent a re-load down the line.
1279  */
1280 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1281 {
1282         struct io_sq_ring *ring = ctx->sq_ring;
1283         unsigned head;
1284
1285         /*
1286          * The cached sq head (or cq tail) serves two purposes:
1287          *
1288          * 1) allows us to batch the cost of updating the user visible
1289          *    head updates.
1290          * 2) allows the kernel side to track the head on its own, even
1291          *    though the application is the one updating it.
1292          */
1293         head = ctx->cached_sq_head;
1294         /* See comment at the top of this file */
1295         smp_rmb();
1296         if (head == READ_ONCE(ring->r.tail))
1297                 return false;
1298
1299         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1300         if (head < ctx->sq_entries) {
1301                 s->index = head;
1302                 s->sqe = &ctx->sq_sqes[head];
1303                 ctx->cached_sq_head++;
1304                 return true;
1305         }
1306
1307         /* drop invalid entries */
1308         ctx->cached_sq_head++;
1309         ring->dropped++;
1310         /* See comment at the top of this file */
1311         smp_wmb();
1312         return false;
1313 }
1314
1315 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1316                           unsigned int nr, bool has_user, bool mm_fault)
1317 {
1318         struct io_submit_state state, *statep = NULL;
1319         int ret, i, submitted = 0;
1320
1321         if (nr > IO_PLUG_THRESHOLD) {
1322                 io_submit_state_start(&state, ctx, nr);
1323                 statep = &state;
1324         }
1325
1326         for (i = 0; i < nr; i++) {
1327                 if (unlikely(mm_fault)) {
1328                         ret = -EFAULT;
1329                 } else {
1330                         sqes[i].has_user = has_user;
1331                         sqes[i].needs_lock = true;
1332                         sqes[i].needs_fixed_file = true;
1333                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1334                 }
1335                 if (!ret) {
1336                         submitted++;
1337                         continue;
1338                 }
1339
1340                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret, 0);
1341         }
1342
1343         if (statep)
1344                 io_submit_state_end(&state);
1345
1346         return submitted;
1347 }
1348
1349 static int io_sq_thread(void *data)
1350 {
1351         struct sqe_submit sqes[IO_IOPOLL_BATCH];
1352         struct io_ring_ctx *ctx = data;
1353         struct mm_struct *cur_mm = NULL;
1354         mm_segment_t old_fs;
1355         DEFINE_WAIT(wait);
1356         unsigned inflight;
1357         unsigned long timeout;
1358
1359         old_fs = get_fs();
1360         set_fs(USER_DS);
1361
1362         timeout = inflight = 0;
1363         while (!kthread_should_stop() && !ctx->sqo_stop) {
1364                 bool all_fixed, mm_fault = false;
1365                 int i;
1366
1367                 if (inflight) {
1368                         unsigned nr_events = 0;
1369
1370                         if (ctx->flags & IORING_SETUP_IOPOLL) {
1371                                 /*
1372                                  * We disallow the app entering submit/complete
1373                                  * with polling, but we still need to lock the
1374                                  * ring to prevent racing with polled issue
1375                                  * that got punted to a workqueue.
1376                                  */
1377                                 mutex_lock(&ctx->uring_lock);
1378                                 io_iopoll_check(ctx, &nr_events, 0);
1379                                 mutex_unlock(&ctx->uring_lock);
1380                         } else {
1381                                 /*
1382                                  * Normal IO, just pretend everything completed.
1383                                  * We don't have to poll completions for that.
1384                                  */
1385                                 nr_events = inflight;
1386                         }
1387
1388                         inflight -= nr_events;
1389                         if (!inflight)
1390                                 timeout = jiffies + ctx->sq_thread_idle;
1391                 }
1392
1393                 if (!io_get_sqring(ctx, &sqes[0])) {
1394                         /*
1395                          * We're polling. If we're within the defined idle
1396                          * period, then let us spin without work before going
1397                          * to sleep.
1398                          */
1399                         if (inflight || !time_after(jiffies, timeout)) {
1400                                 cpu_relax();
1401                                 continue;
1402                         }
1403
1404                         /*
1405                          * Drop cur_mm before scheduling, we can't hold it for
1406                          * long periods (or over schedule()). Do this before
1407                          * adding ourselves to the waitqueue, as the unuse/drop
1408                          * may sleep.
1409                          */
1410                         if (cur_mm) {
1411                                 unuse_mm(cur_mm);
1412                                 mmput(cur_mm);
1413                                 cur_mm = NULL;
1414                         }
1415
1416                         prepare_to_wait(&ctx->sqo_wait, &wait,
1417                                                 TASK_INTERRUPTIBLE);
1418
1419                         /* Tell userspace we may need a wakeup call */
1420                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
1421                         smp_wmb();
1422
1423                         if (!io_get_sqring(ctx, &sqes[0])) {
1424                                 if (kthread_should_stop()) {
1425                                         finish_wait(&ctx->sqo_wait, &wait);
1426                                         break;
1427                                 }
1428                                 if (signal_pending(current))
1429                                         flush_signals(current);
1430                                 schedule();
1431                                 finish_wait(&ctx->sqo_wait, &wait);
1432
1433                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1434                                 smp_wmb();
1435                                 continue;
1436                         }
1437                         finish_wait(&ctx->sqo_wait, &wait);
1438
1439                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
1440                         smp_wmb();
1441                 }
1442
1443                 i = 0;
1444                 all_fixed = true;
1445                 do {
1446                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
1447                                 all_fixed = false;
1448
1449                         i++;
1450                         if (i == ARRAY_SIZE(sqes))
1451                                 break;
1452                 } while (io_get_sqring(ctx, &sqes[i]));
1453
1454                 /* Unless all new commands are FIXED regions, grab mm */
1455                 if (!all_fixed && !cur_mm) {
1456                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
1457                         if (!mm_fault) {
1458                                 use_mm(ctx->sqo_mm);
1459                                 cur_mm = ctx->sqo_mm;
1460                         }
1461                 }
1462
1463                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
1464                                                 mm_fault);
1465
1466                 /* Commit SQ ring head once we've consumed all SQEs */
1467                 io_commit_sqring(ctx);
1468         }
1469
1470         set_fs(old_fs);
1471         if (cur_mm) {
1472                 unuse_mm(cur_mm);
1473                 mmput(cur_mm);
1474         }
1475         return 0;
1476 }
1477
1478 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
1479 {
1480         struct io_submit_state state, *statep = NULL;
1481         int i, ret = 0, submit = 0;
1482
1483         if (to_submit > IO_PLUG_THRESHOLD) {
1484                 io_submit_state_start(&state, ctx, to_submit);
1485                 statep = &state;
1486         }
1487
1488         for (i = 0; i < to_submit; i++) {
1489                 struct sqe_submit s;
1490
1491                 if (!io_get_sqring(ctx, &s))
1492                         break;
1493
1494                 s.has_user = true;
1495                 s.needs_lock = false;
1496                 s.needs_fixed_file = false;
1497
1498                 ret = io_submit_sqe(ctx, &s, statep);
1499                 if (ret) {
1500                         io_drop_sqring(ctx);
1501                         break;
1502                 }
1503
1504                 submit++;
1505         }
1506         io_commit_sqring(ctx);
1507
1508         if (statep)
1509                 io_submit_state_end(statep);
1510
1511         return submit ? submit : ret;
1512 }
1513
1514 static unsigned io_cqring_events(struct io_cq_ring *ring)
1515 {
1516         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
1517 }
1518
1519 /*
1520  * Wait until events become available, if we don't already have some. The
1521  * application must reap them itself, as they reside on the shared cq ring.
1522  */
1523 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
1524                           const sigset_t __user *sig, size_t sigsz)
1525 {
1526         struct io_cq_ring *ring = ctx->cq_ring;
1527         sigset_t ksigmask, sigsaved;
1528         DEFINE_WAIT(wait);
1529         int ret;
1530
1531         /* See comment at the top of this file */
1532         smp_rmb();
1533         if (io_cqring_events(ring) >= min_events)
1534                 return 0;
1535
1536         if (sig) {
1537                 ret = set_user_sigmask(sig, &ksigmask, &sigsaved, sigsz);
1538                 if (ret)
1539                         return ret;
1540         }
1541
1542         do {
1543                 prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
1544
1545                 ret = 0;
1546                 /* See comment at the top of this file */
1547                 smp_rmb();
1548                 if (io_cqring_events(ring) >= min_events)
1549                         break;
1550
1551                 schedule();
1552
1553                 ret = -EINTR;
1554                 if (signal_pending(current))
1555                         break;
1556         } while (1);
1557
1558         finish_wait(&ctx->wait, &wait);
1559
1560         if (sig)
1561                 restore_user_sigmask(sig, &sigsaved);
1562
1563         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
1564 }
1565
1566 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
1567 {
1568 #if defined(CONFIG_UNIX)
1569         if (ctx->ring_sock) {
1570                 struct sock *sock = ctx->ring_sock->sk;
1571                 struct sk_buff *skb;
1572
1573                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
1574                         kfree_skb(skb);
1575         }
1576 #else
1577         int i;
1578
1579         for (i = 0; i < ctx->nr_user_files; i++)
1580                 fput(ctx->user_files[i]);
1581 #endif
1582 }
1583
1584 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
1585 {
1586         if (!ctx->user_files)
1587                 return -ENXIO;
1588
1589         __io_sqe_files_unregister(ctx);
1590         kfree(ctx->user_files);
1591         ctx->user_files = NULL;
1592         ctx->nr_user_files = 0;
1593         return 0;
1594 }
1595
1596 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
1597 {
1598         if (ctx->sqo_thread) {
1599                 ctx->sqo_stop = 1;
1600                 mb();
1601                 kthread_stop(ctx->sqo_thread);
1602                 ctx->sqo_thread = NULL;
1603         }
1604 }
1605
1606 static void io_finish_async(struct io_ring_ctx *ctx)
1607 {
1608         io_sq_thread_stop(ctx);
1609
1610         if (ctx->sqo_wq) {
1611                 destroy_workqueue(ctx->sqo_wq);
1612                 ctx->sqo_wq = NULL;
1613         }
1614 }
1615
1616 #if defined(CONFIG_UNIX)
1617 static void io_destruct_skb(struct sk_buff *skb)
1618 {
1619         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
1620
1621         io_finish_async(ctx);
1622         unix_destruct_scm(skb);
1623 }
1624
1625 /*
1626  * Ensure the UNIX gc is aware of our file set, so we are certain that
1627  * the io_uring can be safely unregistered on process exit, even if we have
1628  * loops in the file referencing.
1629  */
1630 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
1631 {
1632         struct sock *sk = ctx->ring_sock->sk;
1633         struct scm_fp_list *fpl;
1634         struct sk_buff *skb;
1635         int i;
1636
1637         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
1638                 unsigned long inflight = ctx->user->unix_inflight + nr;
1639
1640                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
1641                         return -EMFILE;
1642         }
1643
1644         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
1645         if (!fpl)
1646                 return -ENOMEM;
1647
1648         skb = alloc_skb(0, GFP_KERNEL);
1649         if (!skb) {
1650                 kfree(fpl);
1651                 return -ENOMEM;
1652         }
1653
1654         skb->sk = sk;
1655         skb->destructor = io_destruct_skb;
1656
1657         fpl->user = get_uid(ctx->user);
1658         for (i = 0; i < nr; i++) {
1659                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
1660                 unix_inflight(fpl->user, fpl->fp[i]);
1661         }
1662
1663         fpl->max = fpl->count = nr;
1664         UNIXCB(skb).fp = fpl;
1665         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
1666         skb_queue_head(&sk->sk_receive_queue, skb);
1667
1668         for (i = 0; i < nr; i++)
1669                 fput(fpl->fp[i]);
1670
1671         return 0;
1672 }
1673
1674 /*
1675  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
1676  * causes regular reference counting to break down. We rely on the UNIX
1677  * garbage collection to take care of this problem for us.
1678  */
1679 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
1680 {
1681         unsigned left, total;
1682         int ret = 0;
1683
1684         total = 0;
1685         left = ctx->nr_user_files;
1686         while (left) {
1687                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
1688                 int ret;
1689
1690                 ret = __io_sqe_files_scm(ctx, this_files, total);
1691                 if (ret)
1692                         break;
1693                 left -= this_files;
1694                 total += this_files;
1695         }
1696
1697         if (!ret)
1698                 return 0;
1699
1700         while (total < ctx->nr_user_files) {
1701                 fput(ctx->user_files[total]);
1702                 total++;
1703         }
1704
1705         return ret;
1706 }
1707 #else
1708 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
1709 {
1710         return 0;
1711 }
1712 #endif
1713
1714 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
1715                                  unsigned nr_args)
1716 {
1717         __s32 __user *fds = (__s32 __user *) arg;
1718         int fd, ret = 0;
1719         unsigned i;
1720
1721         if (ctx->user_files)
1722                 return -EBUSY;
1723         if (!nr_args)
1724                 return -EINVAL;
1725         if (nr_args > IORING_MAX_FIXED_FILES)
1726                 return -EMFILE;
1727
1728         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
1729         if (!ctx->user_files)
1730                 return -ENOMEM;
1731
1732         for (i = 0; i < nr_args; i++) {
1733                 ret = -EFAULT;
1734                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
1735                         break;
1736
1737                 ctx->user_files[i] = fget(fd);
1738
1739                 ret = -EBADF;
1740                 if (!ctx->user_files[i])
1741                         break;
1742                 /*
1743                  * Don't allow io_uring instances to be registered. If UNIX
1744                  * isn't enabled, then this causes a reference cycle and this
1745                  * instance can never get freed. If UNIX is enabled we'll
1746                  * handle it just fine, but there's still no point in allowing
1747                  * a ring fd as it doesn't support regular read/write anyway.
1748                  */
1749                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
1750                         fput(ctx->user_files[i]);
1751                         break;
1752                 }
1753                 ctx->nr_user_files++;
1754                 ret = 0;
1755         }
1756
1757         if (ret) {
1758                 for (i = 0; i < ctx->nr_user_files; i++)
1759                         fput(ctx->user_files[i]);
1760
1761                 kfree(ctx->user_files);
1762                 ctx->nr_user_files = 0;
1763                 return ret;
1764         }
1765
1766         ret = io_sqe_files_scm(ctx);
1767         if (ret)
1768                 io_sqe_files_unregister(ctx);
1769
1770         return ret;
1771 }
1772
1773 static int io_sq_offload_start(struct io_ring_ctx *ctx,
1774                                struct io_uring_params *p)
1775 {
1776         int ret;
1777
1778         init_waitqueue_head(&ctx->sqo_wait);
1779         mmgrab(current->mm);
1780         ctx->sqo_mm = current->mm;
1781
1782         ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
1783         if (!ctx->sq_thread_idle)
1784                 ctx->sq_thread_idle = HZ;
1785
1786         ret = -EINVAL;
1787         if (!cpu_possible(p->sq_thread_cpu))
1788                 goto err;
1789
1790         if (ctx->flags & IORING_SETUP_SQPOLL) {
1791                 if (p->flags & IORING_SETUP_SQ_AFF) {
1792                         int cpu;
1793
1794                         cpu = array_index_nospec(p->sq_thread_cpu, NR_CPUS);
1795                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
1796                                                         ctx, cpu,
1797                                                         "io_uring-sq");
1798                 } else {
1799                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
1800                                                         "io_uring-sq");
1801                 }
1802                 if (IS_ERR(ctx->sqo_thread)) {
1803                         ret = PTR_ERR(ctx->sqo_thread);
1804                         ctx->sqo_thread = NULL;
1805                         goto err;
1806                 }
1807                 wake_up_process(ctx->sqo_thread);
1808         } else if (p->flags & IORING_SETUP_SQ_AFF) {
1809                 /* Can't have SQ_AFF without SQPOLL */
1810                 ret = -EINVAL;
1811                 goto err;
1812         }
1813
1814         /* Do QD, or 2 * CPUS, whatever is smallest */
1815         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
1816                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
1817         if (!ctx->sqo_wq) {
1818                 ret = -ENOMEM;
1819                 goto err;
1820         }
1821
1822         return 0;
1823 err:
1824         io_sq_thread_stop(ctx);
1825         mmdrop(ctx->sqo_mm);
1826         ctx->sqo_mm = NULL;
1827         return ret;
1828 }
1829
1830 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
1831 {
1832         atomic_long_sub(nr_pages, &user->locked_vm);
1833 }
1834
1835 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
1836 {
1837         unsigned long page_limit, cur_pages, new_pages;
1838
1839         /* Don't allow more pages than we can safely lock */
1840         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
1841
1842         do {
1843                 cur_pages = atomic_long_read(&user->locked_vm);
1844                 new_pages = cur_pages + nr_pages;
1845                 if (new_pages > page_limit)
1846                         return -ENOMEM;
1847         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
1848                                         new_pages) != cur_pages);
1849
1850         return 0;
1851 }
1852
1853 static void io_mem_free(void *ptr)
1854 {
1855         struct page *page = virt_to_head_page(ptr);
1856
1857         if (put_page_testzero(page))
1858                 free_compound_page(page);
1859 }
1860
1861 static void *io_mem_alloc(size_t size)
1862 {
1863         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
1864                                 __GFP_NORETRY;
1865
1866         return (void *) __get_free_pages(gfp_flags, get_order(size));
1867 }
1868
1869 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
1870 {
1871         struct io_sq_ring *sq_ring;
1872         struct io_cq_ring *cq_ring;
1873         size_t bytes;
1874
1875         bytes = struct_size(sq_ring, array, sq_entries);
1876         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
1877         bytes += struct_size(cq_ring, cqes, cq_entries);
1878
1879         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
1880 }
1881
1882 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
1883 {
1884         int i, j;
1885
1886         if (!ctx->user_bufs)
1887                 return -ENXIO;
1888
1889         for (i = 0; i < ctx->nr_user_bufs; i++) {
1890                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
1891
1892                 for (j = 0; j < imu->nr_bvecs; j++)
1893                         put_page(imu->bvec[j].bv_page);
1894
1895                 if (ctx->account_mem)
1896                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
1897                 kfree(imu->bvec);
1898                 imu->nr_bvecs = 0;
1899         }
1900
1901         kfree(ctx->user_bufs);
1902         ctx->user_bufs = NULL;
1903         ctx->nr_user_bufs = 0;
1904         return 0;
1905 }
1906
1907 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
1908                        void __user *arg, unsigned index)
1909 {
1910         struct iovec __user *src;
1911
1912 #ifdef CONFIG_COMPAT
1913         if (ctx->compat) {
1914                 struct compat_iovec __user *ciovs;
1915                 struct compat_iovec ciov;
1916
1917                 ciovs = (struct compat_iovec __user *) arg;
1918                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
1919                         return -EFAULT;
1920
1921                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
1922                 dst->iov_len = ciov.iov_len;
1923                 return 0;
1924         }
1925 #endif
1926         src = (struct iovec __user *) arg;
1927         if (copy_from_user(dst, &src[index], sizeof(*dst)))
1928                 return -EFAULT;
1929         return 0;
1930 }
1931
1932 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
1933                                   unsigned nr_args)
1934 {
1935         struct vm_area_struct **vmas = NULL;
1936         struct page **pages = NULL;
1937         int i, j, got_pages = 0;
1938         int ret = -EINVAL;
1939
1940         if (ctx->user_bufs)
1941                 return -EBUSY;
1942         if (!nr_args || nr_args > UIO_MAXIOV)
1943                 return -EINVAL;
1944
1945         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
1946                                         GFP_KERNEL);
1947         if (!ctx->user_bufs)
1948                 return -ENOMEM;
1949
1950         for (i = 0; i < nr_args; i++) {
1951                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
1952                 unsigned long off, start, end, ubuf;
1953                 int pret, nr_pages;
1954                 struct iovec iov;
1955                 size_t size;
1956
1957                 ret = io_copy_iov(ctx, &iov, arg, i);
1958                 if (ret)
1959                         break;
1960
1961                 /*
1962                  * Don't impose further limits on the size and buffer
1963                  * constraints here, we'll -EINVAL later when IO is
1964                  * submitted if they are wrong.
1965                  */
1966                 ret = -EFAULT;
1967                 if (!iov.iov_base || !iov.iov_len)
1968                         goto err;
1969
1970                 /* arbitrary limit, but we need something */
1971                 if (iov.iov_len > SZ_1G)
1972                         goto err;
1973
1974                 ubuf = (unsigned long) iov.iov_base;
1975                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
1976                 start = ubuf >> PAGE_SHIFT;
1977                 nr_pages = end - start;
1978
1979                 if (ctx->account_mem) {
1980                         ret = io_account_mem(ctx->user, nr_pages);
1981                         if (ret)
1982                                 goto err;
1983                 }
1984
1985                 ret = 0;
1986                 if (!pages || nr_pages > got_pages) {
1987                         kfree(vmas);
1988                         kfree(pages);
1989                         pages = kmalloc_array(nr_pages, sizeof(struct page *),
1990                                                 GFP_KERNEL);
1991                         vmas = kmalloc_array(nr_pages,
1992                                         sizeof(struct vm_area_struct *),
1993                                         GFP_KERNEL);
1994                         if (!pages || !vmas) {
1995                                 ret = -ENOMEM;
1996                                 if (ctx->account_mem)
1997                                         io_unaccount_mem(ctx->user, nr_pages);
1998                                 goto err;
1999                         }
2000                         got_pages = nr_pages;
2001                 }
2002
2003                 imu->bvec = kmalloc_array(nr_pages, sizeof(struct bio_vec),
2004                                                 GFP_KERNEL);
2005                 ret = -ENOMEM;
2006                 if (!imu->bvec) {
2007                         if (ctx->account_mem)
2008                                 io_unaccount_mem(ctx->user, nr_pages);
2009                         goto err;
2010                 }
2011
2012                 ret = 0;
2013                 down_read(&current->mm->mmap_sem);
2014                 pret = get_user_pages_longterm(ubuf, nr_pages, FOLL_WRITE,
2015                                                 pages, vmas);
2016                 if (pret == nr_pages) {
2017                         /* don't support file backed memory */
2018                         for (j = 0; j < nr_pages; j++) {
2019                                 struct vm_area_struct *vma = vmas[j];
2020
2021                                 if (vma->vm_file &&
2022                                     !is_file_hugepages(vma->vm_file)) {
2023                                         ret = -EOPNOTSUPP;
2024                                         break;
2025                                 }
2026                         }
2027                 } else {
2028                         ret = pret < 0 ? pret : -EFAULT;
2029                 }
2030                 up_read(&current->mm->mmap_sem);
2031                 if (ret) {
2032                         /*
2033                          * if we did partial map, or found file backed vmas,
2034                          * release any pages we did get
2035                          */
2036                         if (pret > 0) {
2037                                 for (j = 0; j < pret; j++)
2038                                         put_page(pages[j]);
2039                         }
2040                         if (ctx->account_mem)
2041                                 io_unaccount_mem(ctx->user, nr_pages);
2042                         goto err;
2043                 }
2044
2045                 off = ubuf & ~PAGE_MASK;
2046                 size = iov.iov_len;
2047                 for (j = 0; j < nr_pages; j++) {
2048                         size_t vec_len;
2049
2050                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2051                         imu->bvec[j].bv_page = pages[j];
2052                         imu->bvec[j].bv_len = vec_len;
2053                         imu->bvec[j].bv_offset = off;
2054                         off = 0;
2055                         size -= vec_len;
2056                 }
2057                 /* store original address for later verification */
2058                 imu->ubuf = ubuf;
2059                 imu->len = iov.iov_len;
2060                 imu->nr_bvecs = nr_pages;
2061
2062                 ctx->nr_user_bufs++;
2063         }
2064         kfree(pages);
2065         kfree(vmas);
2066         return 0;
2067 err:
2068         kfree(pages);
2069         kfree(vmas);
2070         io_sqe_buffer_unregister(ctx);
2071         return ret;
2072 }
2073
2074 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2075 {
2076         io_finish_async(ctx);
2077         if (ctx->sqo_mm)
2078                 mmdrop(ctx->sqo_mm);
2079
2080         io_iopoll_reap_events(ctx);
2081         io_sqe_buffer_unregister(ctx);
2082         io_sqe_files_unregister(ctx);
2083
2084 #if defined(CONFIG_UNIX)
2085         if (ctx->ring_sock)
2086                 sock_release(ctx->ring_sock);
2087 #endif
2088
2089         io_mem_free(ctx->sq_ring);
2090         io_mem_free(ctx->sq_sqes);
2091         io_mem_free(ctx->cq_ring);
2092
2093         percpu_ref_exit(&ctx->refs);
2094         if (ctx->account_mem)
2095                 io_unaccount_mem(ctx->user,
2096                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2097         free_uid(ctx->user);
2098         kfree(ctx);
2099 }
2100
2101 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2102 {
2103         struct io_ring_ctx *ctx = file->private_data;
2104         __poll_t mask = 0;
2105
2106         poll_wait(file, &ctx->cq_wait, wait);
2107         /* See comment at the top of this file */
2108         smp_rmb();
2109         if (READ_ONCE(ctx->sq_ring->r.tail) + 1 != ctx->cached_sq_head)
2110                 mask |= EPOLLOUT | EPOLLWRNORM;
2111         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2112                 mask |= EPOLLIN | EPOLLRDNORM;
2113
2114         return mask;
2115 }
2116
2117 static int io_uring_fasync(int fd, struct file *file, int on)
2118 {
2119         struct io_ring_ctx *ctx = file->private_data;
2120
2121         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2122 }
2123
2124 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2125 {
2126         mutex_lock(&ctx->uring_lock);
2127         percpu_ref_kill(&ctx->refs);
2128         mutex_unlock(&ctx->uring_lock);
2129
2130         io_iopoll_reap_events(ctx);
2131         wait_for_completion(&ctx->ctx_done);
2132         io_ring_ctx_free(ctx);
2133 }
2134
2135 static int io_uring_release(struct inode *inode, struct file *file)
2136 {
2137         struct io_ring_ctx *ctx = file->private_data;
2138
2139         file->private_data = NULL;
2140         io_ring_ctx_wait_and_kill(ctx);
2141         return 0;
2142 }
2143
2144 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2145 {
2146         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2147         unsigned long sz = vma->vm_end - vma->vm_start;
2148         struct io_ring_ctx *ctx = file->private_data;
2149         unsigned long pfn;
2150         struct page *page;
2151         void *ptr;
2152
2153         switch (offset) {
2154         case IORING_OFF_SQ_RING:
2155                 ptr = ctx->sq_ring;
2156                 break;
2157         case IORING_OFF_SQES:
2158                 ptr = ctx->sq_sqes;
2159                 break;
2160         case IORING_OFF_CQ_RING:
2161                 ptr = ctx->cq_ring;
2162                 break;
2163         default:
2164                 return -EINVAL;
2165         }
2166
2167         page = virt_to_head_page(ptr);
2168         if (sz > (PAGE_SIZE << compound_order(page)))
2169                 return -EINVAL;
2170
2171         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2172         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2173 }
2174
2175 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2176                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2177                 size_t, sigsz)
2178 {
2179         struct io_ring_ctx *ctx;
2180         long ret = -EBADF;
2181         int submitted = 0;
2182         struct fd f;
2183
2184         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2185                 return -EINVAL;
2186
2187         f = fdget(fd);
2188         if (!f.file)
2189                 return -EBADF;
2190
2191         ret = -EOPNOTSUPP;
2192         if (f.file->f_op != &io_uring_fops)
2193                 goto out_fput;
2194
2195         ret = -ENXIO;
2196         ctx = f.file->private_data;
2197         if (!percpu_ref_tryget(&ctx->refs))
2198                 goto out_fput;
2199
2200         /*
2201          * For SQ polling, the thread will do all submissions and completions.
2202          * Just return the requested submit count, and wake the thread if
2203          * we were asked to.
2204          */
2205         if (ctx->flags & IORING_SETUP_SQPOLL) {
2206                 if (flags & IORING_ENTER_SQ_WAKEUP)
2207                         wake_up(&ctx->sqo_wait);
2208                 submitted = to_submit;
2209                 goto out_ctx;
2210         }
2211
2212         ret = 0;
2213         if (to_submit) {
2214                 to_submit = min(to_submit, ctx->sq_entries);
2215
2216                 mutex_lock(&ctx->uring_lock);
2217                 submitted = io_ring_submit(ctx, to_submit);
2218                 mutex_unlock(&ctx->uring_lock);
2219
2220                 if (submitted < 0)
2221                         goto out_ctx;
2222         }
2223         if (flags & IORING_ENTER_GETEVENTS) {
2224                 unsigned nr_events = 0;
2225
2226                 min_complete = min(min_complete, ctx->cq_entries);
2227
2228                 /*
2229                  * The application could have included the 'to_submit' count
2230                  * in how many events it wanted to wait for. If we failed to
2231                  * submit the desired count, we may need to adjust the number
2232                  * of events to poll/wait for.
2233                  */
2234                 if (submitted < to_submit)
2235                         min_complete = min_t(unsigned, submitted, min_complete);
2236
2237                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2238                         mutex_lock(&ctx->uring_lock);
2239                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2240                         mutex_unlock(&ctx->uring_lock);
2241                 } else {
2242                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2243                 }
2244         }
2245
2246 out_ctx:
2247         io_ring_drop_ctx_refs(ctx, 1);
2248 out_fput:
2249         fdput(f);
2250         return submitted ? submitted : ret;
2251 }
2252
2253 static const struct file_operations io_uring_fops = {
2254         .release        = io_uring_release,
2255         .mmap           = io_uring_mmap,
2256         .poll           = io_uring_poll,
2257         .fasync         = io_uring_fasync,
2258 };
2259
2260 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2261                                   struct io_uring_params *p)
2262 {
2263         struct io_sq_ring *sq_ring;
2264         struct io_cq_ring *cq_ring;
2265         size_t size;
2266
2267         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2268         if (!sq_ring)
2269                 return -ENOMEM;
2270
2271         ctx->sq_ring = sq_ring;
2272         sq_ring->ring_mask = p->sq_entries - 1;
2273         sq_ring->ring_entries = p->sq_entries;
2274         ctx->sq_mask = sq_ring->ring_mask;
2275         ctx->sq_entries = sq_ring->ring_entries;
2276
2277         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2278         if (size == SIZE_MAX)
2279                 return -EOVERFLOW;
2280
2281         ctx->sq_sqes = io_mem_alloc(size);
2282         if (!ctx->sq_sqes) {
2283                 io_mem_free(ctx->sq_ring);
2284                 return -ENOMEM;
2285         }
2286
2287         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2288         if (!cq_ring) {
2289                 io_mem_free(ctx->sq_ring);
2290                 io_mem_free(ctx->sq_sqes);
2291                 return -ENOMEM;
2292         }
2293
2294         ctx->cq_ring = cq_ring;
2295         cq_ring->ring_mask = p->cq_entries - 1;
2296         cq_ring->ring_entries = p->cq_entries;
2297         ctx->cq_mask = cq_ring->ring_mask;
2298         ctx->cq_entries = cq_ring->ring_entries;
2299         return 0;
2300 }
2301
2302 /*
2303  * Allocate an anonymous fd, this is what constitutes the application
2304  * visible backing of an io_uring instance. The application mmaps this
2305  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2306  * we have to tie this fd to a socket for file garbage collection purposes.
2307  */
2308 static int io_uring_get_fd(struct io_ring_ctx *ctx)
2309 {
2310         struct file *file;
2311         int ret;
2312
2313 #if defined(CONFIG_UNIX)
2314         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
2315                                 &ctx->ring_sock);
2316         if (ret)
2317                 return ret;
2318 #endif
2319
2320         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
2321         if (ret < 0)
2322                 goto err;
2323
2324         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
2325                                         O_RDWR | O_CLOEXEC);
2326         if (IS_ERR(file)) {
2327                 put_unused_fd(ret);
2328                 ret = PTR_ERR(file);
2329                 goto err;
2330         }
2331
2332 #if defined(CONFIG_UNIX)
2333         ctx->ring_sock->file = file;
2334         ctx->ring_sock->sk->sk_user_data = ctx;
2335 #endif
2336         fd_install(ret, file);
2337         return ret;
2338 err:
2339 #if defined(CONFIG_UNIX)
2340         sock_release(ctx->ring_sock);
2341         ctx->ring_sock = NULL;
2342 #endif
2343         return ret;
2344 }
2345
2346 static int io_uring_create(unsigned entries, struct io_uring_params *p)
2347 {
2348         struct user_struct *user = NULL;
2349         struct io_ring_ctx *ctx;
2350         bool account_mem;
2351         int ret;
2352
2353         if (!entries || entries > IORING_MAX_ENTRIES)
2354                 return -EINVAL;
2355
2356         /*
2357          * Use twice as many entries for the CQ ring. It's possible for the
2358          * application to drive a higher depth than the size of the SQ ring,
2359          * since the sqes are only used at submission time. This allows for
2360          * some flexibility in overcommitting a bit.
2361          */
2362         p->sq_entries = roundup_pow_of_two(entries);
2363         p->cq_entries = 2 * p->sq_entries;
2364
2365         user = get_uid(current_user());
2366         account_mem = !capable(CAP_IPC_LOCK);
2367
2368         if (account_mem) {
2369                 ret = io_account_mem(user,
2370                                 ring_pages(p->sq_entries, p->cq_entries));
2371                 if (ret) {
2372                         free_uid(user);
2373                         return ret;
2374                 }
2375         }
2376
2377         ctx = io_ring_ctx_alloc(p);
2378         if (!ctx) {
2379                 if (account_mem)
2380                         io_unaccount_mem(user, ring_pages(p->sq_entries,
2381                                                                 p->cq_entries));
2382                 free_uid(user);
2383                 return -ENOMEM;
2384         }
2385         ctx->compat = in_compat_syscall();
2386         ctx->account_mem = account_mem;
2387         ctx->user = user;
2388
2389         ret = io_allocate_scq_urings(ctx, p);
2390         if (ret)
2391                 goto err;
2392
2393         ret = io_sq_offload_start(ctx, p);
2394         if (ret)
2395                 goto err;
2396
2397         ret = io_uring_get_fd(ctx);
2398         if (ret < 0)
2399                 goto err;
2400
2401         memset(&p->sq_off, 0, sizeof(p->sq_off));
2402         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
2403         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
2404         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
2405         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
2406         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
2407         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
2408         p->sq_off.array = offsetof(struct io_sq_ring, array);
2409
2410         memset(&p->cq_off, 0, sizeof(p->cq_off));
2411         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
2412         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
2413         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
2414         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
2415         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
2416         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
2417         return ret;
2418 err:
2419         io_ring_ctx_wait_and_kill(ctx);
2420         return ret;
2421 }
2422
2423 /*
2424  * Sets up an aio uring context, and returns the fd. Applications asks for a
2425  * ring size, we return the actual sq/cq ring sizes (among other things) in the
2426  * params structure passed in.
2427  */
2428 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
2429 {
2430         struct io_uring_params p;
2431         long ret;
2432         int i;
2433
2434         if (copy_from_user(&p, params, sizeof(p)))
2435                 return -EFAULT;
2436         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
2437                 if (p.resv[i])
2438                         return -EINVAL;
2439         }
2440
2441         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
2442                         IORING_SETUP_SQ_AFF))
2443                 return -EINVAL;
2444
2445         ret = io_uring_create(entries, &p);
2446         if (ret < 0)
2447                 return ret;
2448
2449         if (copy_to_user(params, &p, sizeof(p)))
2450                 return -EFAULT;
2451
2452         return ret;
2453 }
2454
2455 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
2456                 struct io_uring_params __user *, params)
2457 {
2458         return io_uring_setup(entries, params);
2459 }
2460
2461 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
2462                                void __user *arg, unsigned nr_args)
2463 {
2464         int ret;
2465
2466         percpu_ref_kill(&ctx->refs);
2467         wait_for_completion(&ctx->ctx_done);
2468
2469         switch (opcode) {
2470         case IORING_REGISTER_BUFFERS:
2471                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
2472                 break;
2473         case IORING_UNREGISTER_BUFFERS:
2474                 ret = -EINVAL;
2475                 if (arg || nr_args)
2476                         break;
2477                 ret = io_sqe_buffer_unregister(ctx);
2478                 break;
2479         case IORING_REGISTER_FILES:
2480                 ret = io_sqe_files_register(ctx, arg, nr_args);
2481                 break;
2482         case IORING_UNREGISTER_FILES:
2483                 ret = -EINVAL;
2484                 if (arg || nr_args)
2485                         break;
2486                 ret = io_sqe_files_unregister(ctx);
2487                 break;
2488         default:
2489                 ret = -EINVAL;
2490                 break;
2491         }
2492
2493         /* bring the ctx back to life */
2494         reinit_completion(&ctx->ctx_done);
2495         percpu_ref_reinit(&ctx->refs);
2496         return ret;
2497 }
2498
2499 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
2500                 void __user *, arg, unsigned int, nr_args)
2501 {
2502         struct io_ring_ctx *ctx;
2503         long ret = -EBADF;
2504         struct fd f;
2505
2506         f = fdget(fd);
2507         if (!f.file)
2508                 return -EBADF;
2509
2510         ret = -EOPNOTSUPP;
2511         if (f.file->f_op != &io_uring_fops)
2512                 goto out_fput;
2513
2514         ctx = f.file->private_data;
2515
2516         mutex_lock(&ctx->uring_lock);
2517         ret = __io_uring_register(ctx, opcode, arg, nr_args);
2518         mutex_unlock(&ctx->uring_lock);
2519 out_fput:
2520         fdput(f);
2521         return ret;
2522 }
2523
2524 static int __init io_uring_init(void)
2525 {
2526         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
2527         return 0;
2528 };
2529 __initcall(io_uring_init);