]> asedeno.scripts.mit.edu Git - linux.git/blob - fs/io_uring.c
Merge tag 'asoc-v5.3' of https://git.kernel.org/pub/scm/linux/kernel/git/broonie...
[linux.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/workqueue.h>
60 #include <linux/kthread.h>
61 #include <linux/blkdev.h>
62 #include <linux/bvec.h>
63 #include <linux/net.h>
64 #include <net/sock.h>
65 #include <net/af_unix.h>
66 #include <net/scm.h>
67 #include <linux/anon_inodes.h>
68 #include <linux/sched/mm.h>
69 #include <linux/uaccess.h>
70 #include <linux/nospec.h>
71 #include <linux/sizes.h>
72 #include <linux/hugetlb.h>
73
74 #include <uapi/linux/io_uring.h>
75
76 #include "internal.h"
77
78 #define IORING_MAX_ENTRIES      4096
79 #define IORING_MAX_FIXED_FILES  1024
80
81 struct io_uring {
82         u32 head ____cacheline_aligned_in_smp;
83         u32 tail ____cacheline_aligned_in_smp;
84 };
85
86 /*
87  * This data is shared with the application through the mmap at offset
88  * IORING_OFF_SQ_RING.
89  *
90  * The offsets to the member fields are published through struct
91  * io_sqring_offsets when calling io_uring_setup.
92  */
93 struct io_sq_ring {
94         /*
95          * Head and tail offsets into the ring; the offsets need to be
96          * masked to get valid indices.
97          *
98          * The kernel controls head and the application controls tail.
99          */
100         struct io_uring         r;
101         /*
102          * Bitmask to apply to head and tail offsets (constant, equals
103          * ring_entries - 1)
104          */
105         u32                     ring_mask;
106         /* Ring size (constant, power of 2) */
107         u32                     ring_entries;
108         /*
109          * Number of invalid entries dropped by the kernel due to
110          * invalid index stored in array
111          *
112          * Written by the kernel, shouldn't be modified by the
113          * application (i.e. get number of "new events" by comparing to
114          * cached value).
115          *
116          * After a new SQ head value was read by the application this
117          * counter includes all submissions that were dropped reaching
118          * the new SQ head (and possibly more).
119          */
120         u32                     dropped;
121         /*
122          * Runtime flags
123          *
124          * Written by the kernel, shouldn't be modified by the
125          * application.
126          *
127          * The application needs a full memory barrier before checking
128          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
129          */
130         u32                     flags;
131         /*
132          * Ring buffer of indices into array of io_uring_sqe, which is
133          * mmapped by the application using the IORING_OFF_SQES offset.
134          *
135          * This indirection could e.g. be used to assign fixed
136          * io_uring_sqe entries to operations and only submit them to
137          * the queue when needed.
138          *
139          * The kernel modifies neither the indices array nor the entries
140          * array.
141          */
142         u32                     array[];
143 };
144
145 /*
146  * This data is shared with the application through the mmap at offset
147  * IORING_OFF_CQ_RING.
148  *
149  * The offsets to the member fields are published through struct
150  * io_cqring_offsets when calling io_uring_setup.
151  */
152 struct io_cq_ring {
153         /*
154          * Head and tail offsets into the ring; the offsets need to be
155          * masked to get valid indices.
156          *
157          * The application controls head and the kernel tail.
158          */
159         struct io_uring         r;
160         /*
161          * Bitmask to apply to head and tail offsets (constant, equals
162          * ring_entries - 1)
163          */
164         u32                     ring_mask;
165         /* Ring size (constant, power of 2) */
166         u32                     ring_entries;
167         /*
168          * Number of completion events lost because the queue was full;
169          * this should be avoided by the application by making sure
170          * there are not more requests pending thatn there is space in
171          * the completion queue.
172          *
173          * Written by the kernel, shouldn't be modified by the
174          * application (i.e. get number of "new events" by comparing to
175          * cached value).
176          *
177          * As completion events come in out of order this counter is not
178          * ordered with any other data.
179          */
180         u32                     overflow;
181         /*
182          * Ring buffer of completion events.
183          *
184          * The kernel writes completion events fresh every time they are
185          * produced, so the application is allowed to modify pending
186          * entries.
187          */
188         struct io_uring_cqe     cqes[];
189 };
190
191 struct io_mapped_ubuf {
192         u64             ubuf;
193         size_t          len;
194         struct          bio_vec *bvec;
195         unsigned int    nr_bvecs;
196 };
197
198 struct async_list {
199         spinlock_t              lock;
200         atomic_t                cnt;
201         struct list_head        list;
202
203         struct file             *file;
204         off_t                   io_end;
205         size_t                  io_pages;
206 };
207
208 struct io_ring_ctx {
209         struct {
210                 struct percpu_ref       refs;
211         } ____cacheline_aligned_in_smp;
212
213         struct {
214                 unsigned int            flags;
215                 bool                    compat;
216                 bool                    account_mem;
217
218                 /* SQ ring */
219                 struct io_sq_ring       *sq_ring;
220                 unsigned                cached_sq_head;
221                 unsigned                sq_entries;
222                 unsigned                sq_mask;
223                 unsigned                sq_thread_idle;
224                 struct io_uring_sqe     *sq_sqes;
225
226                 struct list_head        defer_list;
227         } ____cacheline_aligned_in_smp;
228
229         /* IO offload */
230         struct workqueue_struct *sqo_wq;
231         struct task_struct      *sqo_thread;    /* if using sq thread polling */
232         struct mm_struct        *sqo_mm;
233         wait_queue_head_t       sqo_wait;
234
235         struct {
236                 /* CQ ring */
237                 struct io_cq_ring       *cq_ring;
238                 unsigned                cached_cq_tail;
239                 unsigned                cq_entries;
240                 unsigned                cq_mask;
241                 struct wait_queue_head  cq_wait;
242                 struct fasync_struct    *cq_fasync;
243                 struct eventfd_ctx      *cq_ev_fd;
244         } ____cacheline_aligned_in_smp;
245
246         /*
247          * If used, fixed file set. Writers must ensure that ->refs is dead,
248          * readers must ensure that ->refs is alive as long as the file* is
249          * used. Only updated through io_uring_register(2).
250          */
251         struct file             **user_files;
252         unsigned                nr_user_files;
253
254         /* if used, fixed mapped user buffers */
255         unsigned                nr_user_bufs;
256         struct io_mapped_ubuf   *user_bufs;
257
258         struct user_struct      *user;
259
260         struct completion       ctx_done;
261
262         struct {
263                 struct mutex            uring_lock;
264                 wait_queue_head_t       wait;
265         } ____cacheline_aligned_in_smp;
266
267         struct {
268                 spinlock_t              completion_lock;
269                 bool                    poll_multi_file;
270                 /*
271                  * ->poll_list is protected by the ctx->uring_lock for
272                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
273                  * For SQPOLL, only the single threaded io_sq_thread() will
274                  * manipulate the list, hence no extra locking is needed there.
275                  */
276                 struct list_head        poll_list;
277                 struct list_head        cancel_list;
278         } ____cacheline_aligned_in_smp;
279
280         struct async_list       pending_async[2];
281
282 #if defined(CONFIG_UNIX)
283         struct socket           *ring_sock;
284 #endif
285 };
286
287 struct sqe_submit {
288         const struct io_uring_sqe       *sqe;
289         unsigned short                  index;
290         bool                            has_user;
291         bool                            needs_lock;
292         bool                            needs_fixed_file;
293 };
294
295 /*
296  * First field must be the file pointer in all the
297  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
298  */
299 struct io_poll_iocb {
300         struct file                     *file;
301         struct wait_queue_head          *head;
302         __poll_t                        events;
303         bool                            done;
304         bool                            canceled;
305         struct wait_queue_entry         wait;
306 };
307
308 /*
309  * NOTE! Each of the iocb union members has the file pointer
310  * as the first entry in their struct definition. So you can
311  * access the file pointer through any of the sub-structs,
312  * or directly as just 'ki_filp' in this struct.
313  */
314 struct io_kiocb {
315         union {
316                 struct file             *file;
317                 struct kiocb            rw;
318                 struct io_poll_iocb     poll;
319         };
320
321         struct sqe_submit       submit;
322
323         struct io_ring_ctx      *ctx;
324         struct list_head        list;
325         unsigned int            flags;
326         refcount_t              refs;
327 #define REQ_F_NOWAIT            1       /* must not punt to workers */
328 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
329 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
330 #define REQ_F_SEQ_PREV          8       /* sequential with previous */
331 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
332 #define REQ_F_IO_DRAINED        32      /* drain done */
333         u64                     user_data;
334         u32                     error;  /* iopoll result from callback */
335         u32                     sequence;
336
337         struct work_struct      work;
338 };
339
340 #define IO_PLUG_THRESHOLD               2
341 #define IO_IOPOLL_BATCH                 8
342
343 struct io_submit_state {
344         struct blk_plug         plug;
345
346         /*
347          * io_kiocb alloc cache
348          */
349         void                    *reqs[IO_IOPOLL_BATCH];
350         unsigned                int free_reqs;
351         unsigned                int cur_req;
352
353         /*
354          * File reference cache
355          */
356         struct file             *file;
357         unsigned int            fd;
358         unsigned int            has_refs;
359         unsigned int            used_refs;
360         unsigned int            ios_left;
361 };
362
363 static void io_sq_wq_submit_work(struct work_struct *work);
364
365 static struct kmem_cache *req_cachep;
366
367 static const struct file_operations io_uring_fops;
368
369 struct sock *io_uring_get_socket(struct file *file)
370 {
371 #if defined(CONFIG_UNIX)
372         if (file->f_op == &io_uring_fops) {
373                 struct io_ring_ctx *ctx = file->private_data;
374
375                 return ctx->ring_sock->sk;
376         }
377 #endif
378         return NULL;
379 }
380 EXPORT_SYMBOL(io_uring_get_socket);
381
382 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
383 {
384         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
385
386         complete(&ctx->ctx_done);
387 }
388
389 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
390 {
391         struct io_ring_ctx *ctx;
392         int i;
393
394         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
395         if (!ctx)
396                 return NULL;
397
398         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free, 0, GFP_KERNEL)) {
399                 kfree(ctx);
400                 return NULL;
401         }
402
403         ctx->flags = p->flags;
404         init_waitqueue_head(&ctx->cq_wait);
405         init_completion(&ctx->ctx_done);
406         mutex_init(&ctx->uring_lock);
407         init_waitqueue_head(&ctx->wait);
408         for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
409                 spin_lock_init(&ctx->pending_async[i].lock);
410                 INIT_LIST_HEAD(&ctx->pending_async[i].list);
411                 atomic_set(&ctx->pending_async[i].cnt, 0);
412         }
413         spin_lock_init(&ctx->completion_lock);
414         INIT_LIST_HEAD(&ctx->poll_list);
415         INIT_LIST_HEAD(&ctx->cancel_list);
416         INIT_LIST_HEAD(&ctx->defer_list);
417         return ctx;
418 }
419
420 static inline bool io_sequence_defer(struct io_ring_ctx *ctx,
421                                      struct io_kiocb *req)
422 {
423         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) != REQ_F_IO_DRAIN)
424                 return false;
425
426         return req->sequence > ctx->cached_cq_tail + ctx->sq_ring->dropped;
427 }
428
429 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
430 {
431         struct io_kiocb *req;
432
433         if (list_empty(&ctx->defer_list))
434                 return NULL;
435
436         req = list_first_entry(&ctx->defer_list, struct io_kiocb, list);
437         if (!io_sequence_defer(ctx, req)) {
438                 list_del_init(&req->list);
439                 return req;
440         }
441
442         return NULL;
443 }
444
445 static void __io_commit_cqring(struct io_ring_ctx *ctx)
446 {
447         struct io_cq_ring *ring = ctx->cq_ring;
448
449         if (ctx->cached_cq_tail != READ_ONCE(ring->r.tail)) {
450                 /* order cqe stores with ring update */
451                 smp_store_release(&ring->r.tail, ctx->cached_cq_tail);
452
453                 if (wq_has_sleeper(&ctx->cq_wait)) {
454                         wake_up_interruptible(&ctx->cq_wait);
455                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
456                 }
457         }
458 }
459
460 static void io_commit_cqring(struct io_ring_ctx *ctx)
461 {
462         struct io_kiocb *req;
463
464         __io_commit_cqring(ctx);
465
466         while ((req = io_get_deferred_req(ctx)) != NULL) {
467                 req->flags |= REQ_F_IO_DRAINED;
468                 queue_work(ctx->sqo_wq, &req->work);
469         }
470 }
471
472 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
473 {
474         struct io_cq_ring *ring = ctx->cq_ring;
475         unsigned tail;
476
477         tail = ctx->cached_cq_tail;
478         /*
479          * writes to the cq entry need to come after reading head; the
480          * control dependency is enough as we're using WRITE_ONCE to
481          * fill the cq entry
482          */
483         if (tail - READ_ONCE(ring->r.head) == ring->ring_entries)
484                 return NULL;
485
486         ctx->cached_cq_tail++;
487         return &ring->cqes[tail & ctx->cq_mask];
488 }
489
490 static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
491                                  long res)
492 {
493         struct io_uring_cqe *cqe;
494
495         /*
496          * If we can't get a cq entry, userspace overflowed the
497          * submission (by quite a lot). Increment the overflow count in
498          * the ring.
499          */
500         cqe = io_get_cqring(ctx);
501         if (cqe) {
502                 WRITE_ONCE(cqe->user_data, ki_user_data);
503                 WRITE_ONCE(cqe->res, res);
504                 WRITE_ONCE(cqe->flags, 0);
505         } else {
506                 unsigned overflow = READ_ONCE(ctx->cq_ring->overflow);
507
508                 WRITE_ONCE(ctx->cq_ring->overflow, overflow + 1);
509         }
510 }
511
512 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
513 {
514         if (waitqueue_active(&ctx->wait))
515                 wake_up(&ctx->wait);
516         if (waitqueue_active(&ctx->sqo_wait))
517                 wake_up(&ctx->sqo_wait);
518         if (ctx->cq_ev_fd)
519                 eventfd_signal(ctx->cq_ev_fd, 1);
520 }
521
522 static void io_cqring_add_event(struct io_ring_ctx *ctx, u64 user_data,
523                                 long res)
524 {
525         unsigned long flags;
526
527         spin_lock_irqsave(&ctx->completion_lock, flags);
528         io_cqring_fill_event(ctx, user_data, res);
529         io_commit_cqring(ctx);
530         spin_unlock_irqrestore(&ctx->completion_lock, flags);
531
532         io_cqring_ev_posted(ctx);
533 }
534
535 static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
536 {
537         percpu_ref_put_many(&ctx->refs, refs);
538
539         if (waitqueue_active(&ctx->wait))
540                 wake_up(&ctx->wait);
541 }
542
543 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
544                                    struct io_submit_state *state)
545 {
546         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
547         struct io_kiocb *req;
548
549         if (!percpu_ref_tryget(&ctx->refs))
550                 return NULL;
551
552         if (!state) {
553                 req = kmem_cache_alloc(req_cachep, gfp);
554                 if (unlikely(!req))
555                         goto out;
556         } else if (!state->free_reqs) {
557                 size_t sz;
558                 int ret;
559
560                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
561                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
562
563                 /*
564                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
565                  * retry single alloc to be on the safe side.
566                  */
567                 if (unlikely(ret <= 0)) {
568                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
569                         if (!state->reqs[0])
570                                 goto out;
571                         ret = 1;
572                 }
573                 state->free_reqs = ret - 1;
574                 state->cur_req = 1;
575                 req = state->reqs[0];
576         } else {
577                 req = state->reqs[state->cur_req];
578                 state->free_reqs--;
579                 state->cur_req++;
580         }
581
582         req->file = NULL;
583         req->ctx = ctx;
584         req->flags = 0;
585         /* one is dropped after submission, the other at completion */
586         refcount_set(&req->refs, 2);
587         return req;
588 out:
589         io_ring_drop_ctx_refs(ctx, 1);
590         return NULL;
591 }
592
593 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
594 {
595         if (*nr) {
596                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
597                 io_ring_drop_ctx_refs(ctx, *nr);
598                 *nr = 0;
599         }
600 }
601
602 static void io_free_req(struct io_kiocb *req)
603 {
604         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
605                 fput(req->file);
606         io_ring_drop_ctx_refs(req->ctx, 1);
607         kmem_cache_free(req_cachep, req);
608 }
609
610 static void io_put_req(struct io_kiocb *req)
611 {
612         if (refcount_dec_and_test(&req->refs))
613                 io_free_req(req);
614 }
615
616 /*
617  * Find and free completed poll iocbs
618  */
619 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
620                                struct list_head *done)
621 {
622         void *reqs[IO_IOPOLL_BATCH];
623         struct io_kiocb *req;
624         int to_free;
625
626         to_free = 0;
627         while (!list_empty(done)) {
628                 req = list_first_entry(done, struct io_kiocb, list);
629                 list_del(&req->list);
630
631                 io_cqring_fill_event(ctx, req->user_data, req->error);
632                 (*nr_events)++;
633
634                 if (refcount_dec_and_test(&req->refs)) {
635                         /* If we're not using fixed files, we have to pair the
636                          * completion part with the file put. Use regular
637                          * completions for those, only batch free for fixed
638                          * file.
639                          */
640                         if (req->flags & REQ_F_FIXED_FILE) {
641                                 reqs[to_free++] = req;
642                                 if (to_free == ARRAY_SIZE(reqs))
643                                         io_free_req_many(ctx, reqs, &to_free);
644                         } else {
645                                 io_free_req(req);
646                         }
647                 }
648         }
649
650         io_commit_cqring(ctx);
651         io_free_req_many(ctx, reqs, &to_free);
652 }
653
654 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
655                         long min)
656 {
657         struct io_kiocb *req, *tmp;
658         LIST_HEAD(done);
659         bool spin;
660         int ret;
661
662         /*
663          * Only spin for completions if we don't have multiple devices hanging
664          * off our complete list, and we're under the requested amount.
665          */
666         spin = !ctx->poll_multi_file && *nr_events < min;
667
668         ret = 0;
669         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
670                 struct kiocb *kiocb = &req->rw;
671
672                 /*
673                  * Move completed entries to our local list. If we find a
674                  * request that requires polling, break out and complete
675                  * the done list first, if we have entries there.
676                  */
677                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
678                         list_move_tail(&req->list, &done);
679                         continue;
680                 }
681                 if (!list_empty(&done))
682                         break;
683
684                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
685                 if (ret < 0)
686                         break;
687
688                 if (ret && spin)
689                         spin = false;
690                 ret = 0;
691         }
692
693         if (!list_empty(&done))
694                 io_iopoll_complete(ctx, nr_events, &done);
695
696         return ret;
697 }
698
699 /*
700  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
701  * non-spinning poll check - we'll still enter the driver poll loop, but only
702  * as a non-spinning completion check.
703  */
704 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
705                                 long min)
706 {
707         while (!list_empty(&ctx->poll_list)) {
708                 int ret;
709
710                 ret = io_do_iopoll(ctx, nr_events, min);
711                 if (ret < 0)
712                         return ret;
713                 if (!min || *nr_events >= min)
714                         return 0;
715         }
716
717         return 1;
718 }
719
720 /*
721  * We can't just wait for polled events to come to us, we have to actively
722  * find and complete them.
723  */
724 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
725 {
726         if (!(ctx->flags & IORING_SETUP_IOPOLL))
727                 return;
728
729         mutex_lock(&ctx->uring_lock);
730         while (!list_empty(&ctx->poll_list)) {
731                 unsigned int nr_events = 0;
732
733                 io_iopoll_getevents(ctx, &nr_events, 1);
734         }
735         mutex_unlock(&ctx->uring_lock);
736 }
737
738 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
739                            long min)
740 {
741         int ret = 0;
742
743         do {
744                 int tmin = 0;
745
746                 if (*nr_events < min)
747                         tmin = min - *nr_events;
748
749                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
750                 if (ret <= 0)
751                         break;
752                 ret = 0;
753         } while (min && !*nr_events && !need_resched());
754
755         return ret;
756 }
757
758 static void kiocb_end_write(struct kiocb *kiocb)
759 {
760         if (kiocb->ki_flags & IOCB_WRITE) {
761                 struct inode *inode = file_inode(kiocb->ki_filp);
762
763                 /*
764                  * Tell lockdep we inherited freeze protection from submission
765                  * thread.
766                  */
767                 if (S_ISREG(inode->i_mode))
768                         __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
769                 file_end_write(kiocb->ki_filp);
770         }
771 }
772
773 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
774 {
775         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
776
777         kiocb_end_write(kiocb);
778
779         io_cqring_add_event(req->ctx, req->user_data, res);
780         io_put_req(req);
781 }
782
783 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
784 {
785         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
786
787         kiocb_end_write(kiocb);
788
789         req->error = res;
790         if (res != -EAGAIN)
791                 req->flags |= REQ_F_IOPOLL_COMPLETED;
792 }
793
794 /*
795  * After the iocb has been issued, it's safe to be found on the poll list.
796  * Adding the kiocb to the list AFTER submission ensures that we don't
797  * find it from a io_iopoll_getevents() thread before the issuer is done
798  * accessing the kiocb cookie.
799  */
800 static void io_iopoll_req_issued(struct io_kiocb *req)
801 {
802         struct io_ring_ctx *ctx = req->ctx;
803
804         /*
805          * Track whether we have multiple files in our lists. This will impact
806          * how we do polling eventually, not spinning if we're on potentially
807          * different devices.
808          */
809         if (list_empty(&ctx->poll_list)) {
810                 ctx->poll_multi_file = false;
811         } else if (!ctx->poll_multi_file) {
812                 struct io_kiocb *list_req;
813
814                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
815                                                 list);
816                 if (list_req->rw.ki_filp != req->rw.ki_filp)
817                         ctx->poll_multi_file = true;
818         }
819
820         /*
821          * For fast devices, IO may have already completed. If it has, add
822          * it to the front so we find it first.
823          */
824         if (req->flags & REQ_F_IOPOLL_COMPLETED)
825                 list_add(&req->list, &ctx->poll_list);
826         else
827                 list_add_tail(&req->list, &ctx->poll_list);
828 }
829
830 static void io_file_put(struct io_submit_state *state)
831 {
832         if (state->file) {
833                 int diff = state->has_refs - state->used_refs;
834
835                 if (diff)
836                         fput_many(state->file, diff);
837                 state->file = NULL;
838         }
839 }
840
841 /*
842  * Get as many references to a file as we have IOs left in this submission,
843  * assuming most submissions are for one file, or at least that each file
844  * has more than one submission.
845  */
846 static struct file *io_file_get(struct io_submit_state *state, int fd)
847 {
848         if (!state)
849                 return fget(fd);
850
851         if (state->file) {
852                 if (state->fd == fd) {
853                         state->used_refs++;
854                         state->ios_left--;
855                         return state->file;
856                 }
857                 io_file_put(state);
858         }
859         state->file = fget_many(fd, state->ios_left);
860         if (!state->file)
861                 return NULL;
862
863         state->fd = fd;
864         state->has_refs = state->ios_left;
865         state->used_refs = 1;
866         state->ios_left--;
867         return state->file;
868 }
869
870 /*
871  * If we tracked the file through the SCM inflight mechanism, we could support
872  * any file. For now, just ensure that anything potentially problematic is done
873  * inline.
874  */
875 static bool io_file_supports_async(struct file *file)
876 {
877         umode_t mode = file_inode(file)->i_mode;
878
879         if (S_ISBLK(mode) || S_ISCHR(mode))
880                 return true;
881         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
882                 return true;
883
884         return false;
885 }
886
887 static int io_prep_rw(struct io_kiocb *req, const struct sqe_submit *s,
888                       bool force_nonblock)
889 {
890         const struct io_uring_sqe *sqe = s->sqe;
891         struct io_ring_ctx *ctx = req->ctx;
892         struct kiocb *kiocb = &req->rw;
893         unsigned ioprio;
894         int ret;
895
896         if (!req->file)
897                 return -EBADF;
898
899         if (force_nonblock && !io_file_supports_async(req->file))
900                 force_nonblock = false;
901
902         kiocb->ki_pos = READ_ONCE(sqe->off);
903         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
904         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
905
906         ioprio = READ_ONCE(sqe->ioprio);
907         if (ioprio) {
908                 ret = ioprio_check_cap(ioprio);
909                 if (ret)
910                         return ret;
911
912                 kiocb->ki_ioprio = ioprio;
913         } else
914                 kiocb->ki_ioprio = get_current_ioprio();
915
916         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
917         if (unlikely(ret))
918                 return ret;
919
920         /* don't allow async punt if RWF_NOWAIT was requested */
921         if (kiocb->ki_flags & IOCB_NOWAIT)
922                 req->flags |= REQ_F_NOWAIT;
923
924         if (force_nonblock)
925                 kiocb->ki_flags |= IOCB_NOWAIT;
926
927         if (ctx->flags & IORING_SETUP_IOPOLL) {
928                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
929                     !kiocb->ki_filp->f_op->iopoll)
930                         return -EOPNOTSUPP;
931
932                 req->error = 0;
933                 kiocb->ki_flags |= IOCB_HIPRI;
934                 kiocb->ki_complete = io_complete_rw_iopoll;
935         } else {
936                 if (kiocb->ki_flags & IOCB_HIPRI)
937                         return -EINVAL;
938                 kiocb->ki_complete = io_complete_rw;
939         }
940         return 0;
941 }
942
943 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
944 {
945         switch (ret) {
946         case -EIOCBQUEUED:
947                 break;
948         case -ERESTARTSYS:
949         case -ERESTARTNOINTR:
950         case -ERESTARTNOHAND:
951         case -ERESTART_RESTARTBLOCK:
952                 /*
953                  * We can't just restart the syscall, since previously
954                  * submitted sqes may already be in progress. Just fail this
955                  * IO with EINTR.
956                  */
957                 ret = -EINTR;
958                 /* fall through */
959         default:
960                 kiocb->ki_complete(kiocb, ret, 0);
961         }
962 }
963
964 static int io_import_fixed(struct io_ring_ctx *ctx, int rw,
965                            const struct io_uring_sqe *sqe,
966                            struct iov_iter *iter)
967 {
968         size_t len = READ_ONCE(sqe->len);
969         struct io_mapped_ubuf *imu;
970         unsigned index, buf_index;
971         size_t offset;
972         u64 buf_addr;
973
974         /* attempt to use fixed buffers without having provided iovecs */
975         if (unlikely(!ctx->user_bufs))
976                 return -EFAULT;
977
978         buf_index = READ_ONCE(sqe->buf_index);
979         if (unlikely(buf_index >= ctx->nr_user_bufs))
980                 return -EFAULT;
981
982         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
983         imu = &ctx->user_bufs[index];
984         buf_addr = READ_ONCE(sqe->addr);
985
986         /* overflow */
987         if (buf_addr + len < buf_addr)
988                 return -EFAULT;
989         /* not inside the mapped region */
990         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
991                 return -EFAULT;
992
993         /*
994          * May not be a start of buffer, set size appropriately
995          * and advance us to the beginning.
996          */
997         offset = buf_addr - imu->ubuf;
998         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
999         if (offset)
1000                 iov_iter_advance(iter, offset);
1001
1002         /* don't drop a reference to these pages */
1003         iter->type |= ITER_BVEC_FLAG_NO_REF;
1004         return 0;
1005 }
1006
1007 static int io_import_iovec(struct io_ring_ctx *ctx, int rw,
1008                            const struct sqe_submit *s, struct iovec **iovec,
1009                            struct iov_iter *iter)
1010 {
1011         const struct io_uring_sqe *sqe = s->sqe;
1012         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1013         size_t sqe_len = READ_ONCE(sqe->len);
1014         u8 opcode;
1015
1016         /*
1017          * We're reading ->opcode for the second time, but the first read
1018          * doesn't care whether it's _FIXED or not, so it doesn't matter
1019          * whether ->opcode changes concurrently. The first read does care
1020          * about whether it is a READ or a WRITE, so we don't trust this read
1021          * for that purpose and instead let the caller pass in the read/write
1022          * flag.
1023          */
1024         opcode = READ_ONCE(sqe->opcode);
1025         if (opcode == IORING_OP_READ_FIXED ||
1026             opcode == IORING_OP_WRITE_FIXED) {
1027                 int ret = io_import_fixed(ctx, rw, sqe, iter);
1028                 *iovec = NULL;
1029                 return ret;
1030         }
1031
1032         if (!s->has_user)
1033                 return -EFAULT;
1034
1035 #ifdef CONFIG_COMPAT
1036         if (ctx->compat)
1037                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1038                                                 iovec, iter);
1039 #endif
1040
1041         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1042 }
1043
1044 /*
1045  * Make a note of the last file/offset/direction we punted to async
1046  * context. We'll use this information to see if we can piggy back a
1047  * sequential request onto the previous one, if it's still hasn't been
1048  * completed by the async worker.
1049  */
1050 static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
1051 {
1052         struct async_list *async_list = &req->ctx->pending_async[rw];
1053         struct kiocb *kiocb = &req->rw;
1054         struct file *filp = kiocb->ki_filp;
1055         off_t io_end = kiocb->ki_pos + len;
1056
1057         if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
1058                 unsigned long max_pages;
1059
1060                 /* Use 8x RA size as a decent limiter for both reads/writes */
1061                 max_pages = filp->f_ra.ra_pages;
1062                 if (!max_pages)
1063                         max_pages = VM_READAHEAD_PAGES;
1064                 max_pages *= 8;
1065
1066                 /* If max pages are exceeded, reset the state */
1067                 len >>= PAGE_SHIFT;
1068                 if (async_list->io_pages + len <= max_pages) {
1069                         req->flags |= REQ_F_SEQ_PREV;
1070                         async_list->io_pages += len;
1071                 } else {
1072                         io_end = 0;
1073                         async_list->io_pages = 0;
1074                 }
1075         }
1076
1077         /* New file? Reset state. */
1078         if (async_list->file != filp) {
1079                 async_list->io_pages = 0;
1080                 async_list->file = filp;
1081         }
1082         async_list->io_end = io_end;
1083 }
1084
1085 static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
1086                    bool force_nonblock)
1087 {
1088         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1089         struct kiocb *kiocb = &req->rw;
1090         struct iov_iter iter;
1091         struct file *file;
1092         size_t iov_count;
1093         int ret;
1094
1095         ret = io_prep_rw(req, s, force_nonblock);
1096         if (ret)
1097                 return ret;
1098         file = kiocb->ki_filp;
1099
1100         if (unlikely(!(file->f_mode & FMODE_READ)))
1101                 return -EBADF;
1102         if (unlikely(!file->f_op->read_iter))
1103                 return -EINVAL;
1104
1105         ret = io_import_iovec(req->ctx, READ, s, &iovec, &iter);
1106         if (ret)
1107                 return ret;
1108
1109         iov_count = iov_iter_count(&iter);
1110         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1111         if (!ret) {
1112                 ssize_t ret2;
1113
1114                 /* Catch -EAGAIN return for forced non-blocking submission */
1115                 ret2 = call_read_iter(file, kiocb, &iter);
1116                 if (!force_nonblock || ret2 != -EAGAIN) {
1117                         io_rw_done(kiocb, ret2);
1118                 } else {
1119                         /*
1120                          * If ->needs_lock is true, we're already in async
1121                          * context.
1122                          */
1123                         if (!s->needs_lock)
1124                                 io_async_list_note(READ, req, iov_count);
1125                         ret = -EAGAIN;
1126                 }
1127         }
1128         kfree(iovec);
1129         return ret;
1130 }
1131
1132 static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
1133                     bool force_nonblock)
1134 {
1135         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1136         struct kiocb *kiocb = &req->rw;
1137         struct iov_iter iter;
1138         struct file *file;
1139         size_t iov_count;
1140         int ret;
1141
1142         ret = io_prep_rw(req, s, force_nonblock);
1143         if (ret)
1144                 return ret;
1145
1146         file = kiocb->ki_filp;
1147         if (unlikely(!(file->f_mode & FMODE_WRITE)))
1148                 return -EBADF;
1149         if (unlikely(!file->f_op->write_iter))
1150                 return -EINVAL;
1151
1152         ret = io_import_iovec(req->ctx, WRITE, s, &iovec, &iter);
1153         if (ret)
1154                 return ret;
1155
1156         iov_count = iov_iter_count(&iter);
1157
1158         ret = -EAGAIN;
1159         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
1160                 /* If ->needs_lock is true, we're already in async context. */
1161                 if (!s->needs_lock)
1162                         io_async_list_note(WRITE, req, iov_count);
1163                 goto out_free;
1164         }
1165
1166         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1167         if (!ret) {
1168                 ssize_t ret2;
1169
1170                 /*
1171                  * Open-code file_start_write here to grab freeze protection,
1172                  * which will be released by another thread in
1173                  * io_complete_rw().  Fool lockdep by telling it the lock got
1174                  * released so that it doesn't complain about the held lock when
1175                  * we return to userspace.
1176                  */
1177                 if (S_ISREG(file_inode(file)->i_mode)) {
1178                         __sb_start_write(file_inode(file)->i_sb,
1179                                                 SB_FREEZE_WRITE, true);
1180                         __sb_writers_release(file_inode(file)->i_sb,
1181                                                 SB_FREEZE_WRITE);
1182                 }
1183                 kiocb->ki_flags |= IOCB_WRITE;
1184
1185                 ret2 = call_write_iter(file, kiocb, &iter);
1186                 if (!force_nonblock || ret2 != -EAGAIN) {
1187                         io_rw_done(kiocb, ret2);
1188                 } else {
1189                         /*
1190                          * If ->needs_lock is true, we're already in async
1191                          * context.
1192                          */
1193                         if (!s->needs_lock)
1194                                 io_async_list_note(WRITE, req, iov_count);
1195                         ret = -EAGAIN;
1196                 }
1197         }
1198 out_free:
1199         kfree(iovec);
1200         return ret;
1201 }
1202
1203 /*
1204  * IORING_OP_NOP just posts a completion event, nothing else.
1205  */
1206 static int io_nop(struct io_kiocb *req, u64 user_data)
1207 {
1208         struct io_ring_ctx *ctx = req->ctx;
1209         long err = 0;
1210
1211         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1212                 return -EINVAL;
1213
1214         io_cqring_add_event(ctx, user_data, err);
1215         io_put_req(req);
1216         return 0;
1217 }
1218
1219 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1220 {
1221         struct io_ring_ctx *ctx = req->ctx;
1222
1223         if (!req->file)
1224                 return -EBADF;
1225
1226         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1227                 return -EINVAL;
1228         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1229                 return -EINVAL;
1230
1231         return 0;
1232 }
1233
1234 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1235                     bool force_nonblock)
1236 {
1237         loff_t sqe_off = READ_ONCE(sqe->off);
1238         loff_t sqe_len = READ_ONCE(sqe->len);
1239         loff_t end = sqe_off + sqe_len;
1240         unsigned fsync_flags;
1241         int ret;
1242
1243         fsync_flags = READ_ONCE(sqe->fsync_flags);
1244         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1245                 return -EINVAL;
1246
1247         ret = io_prep_fsync(req, sqe);
1248         if (ret)
1249                 return ret;
1250
1251         /* fsync always requires a blocking context */
1252         if (force_nonblock)
1253                 return -EAGAIN;
1254
1255         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1256                                 end > 0 ? end : LLONG_MAX,
1257                                 fsync_flags & IORING_FSYNC_DATASYNC);
1258
1259         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1260         io_put_req(req);
1261         return 0;
1262 }
1263
1264 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1265 {
1266         struct io_ring_ctx *ctx = req->ctx;
1267         int ret = 0;
1268
1269         if (!req->file)
1270                 return -EBADF;
1271
1272         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1273                 return -EINVAL;
1274         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1275                 return -EINVAL;
1276
1277         return ret;
1278 }
1279
1280 static int io_sync_file_range(struct io_kiocb *req,
1281                               const struct io_uring_sqe *sqe,
1282                               bool force_nonblock)
1283 {
1284         loff_t sqe_off;
1285         loff_t sqe_len;
1286         unsigned flags;
1287         int ret;
1288
1289         ret = io_prep_sfr(req, sqe);
1290         if (ret)
1291                 return ret;
1292
1293         /* sync_file_range always requires a blocking context */
1294         if (force_nonblock)
1295                 return -EAGAIN;
1296
1297         sqe_off = READ_ONCE(sqe->off);
1298         sqe_len = READ_ONCE(sqe->len);
1299         flags = READ_ONCE(sqe->sync_range_flags);
1300
1301         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
1302
1303         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1304         io_put_req(req);
1305         return 0;
1306 }
1307
1308 static void io_poll_remove_one(struct io_kiocb *req)
1309 {
1310         struct io_poll_iocb *poll = &req->poll;
1311
1312         spin_lock(&poll->head->lock);
1313         WRITE_ONCE(poll->canceled, true);
1314         if (!list_empty(&poll->wait.entry)) {
1315                 list_del_init(&poll->wait.entry);
1316                 queue_work(req->ctx->sqo_wq, &req->work);
1317         }
1318         spin_unlock(&poll->head->lock);
1319
1320         list_del_init(&req->list);
1321 }
1322
1323 static void io_poll_remove_all(struct io_ring_ctx *ctx)
1324 {
1325         struct io_kiocb *req;
1326
1327         spin_lock_irq(&ctx->completion_lock);
1328         while (!list_empty(&ctx->cancel_list)) {
1329                 req = list_first_entry(&ctx->cancel_list, struct io_kiocb,list);
1330                 io_poll_remove_one(req);
1331         }
1332         spin_unlock_irq(&ctx->completion_lock);
1333 }
1334
1335 /*
1336  * Find a running poll command that matches one specified in sqe->addr,
1337  * and remove it if found.
1338  */
1339 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1340 {
1341         struct io_ring_ctx *ctx = req->ctx;
1342         struct io_kiocb *poll_req, *next;
1343         int ret = -ENOENT;
1344
1345         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1346                 return -EINVAL;
1347         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
1348             sqe->poll_events)
1349                 return -EINVAL;
1350
1351         spin_lock_irq(&ctx->completion_lock);
1352         list_for_each_entry_safe(poll_req, next, &ctx->cancel_list, list) {
1353                 if (READ_ONCE(sqe->addr) == poll_req->user_data) {
1354                         io_poll_remove_one(poll_req);
1355                         ret = 0;
1356                         break;
1357                 }
1358         }
1359         spin_unlock_irq(&ctx->completion_lock);
1360
1361         io_cqring_add_event(req->ctx, sqe->user_data, ret);
1362         io_put_req(req);
1363         return 0;
1364 }
1365
1366 static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
1367                              __poll_t mask)
1368 {
1369         req->poll.done = true;
1370         io_cqring_fill_event(ctx, req->user_data, mangle_poll(mask));
1371         io_commit_cqring(ctx);
1372 }
1373
1374 static void io_poll_complete_work(struct work_struct *work)
1375 {
1376         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1377         struct io_poll_iocb *poll = &req->poll;
1378         struct poll_table_struct pt = { ._key = poll->events };
1379         struct io_ring_ctx *ctx = req->ctx;
1380         __poll_t mask = 0;
1381
1382         if (!READ_ONCE(poll->canceled))
1383                 mask = vfs_poll(poll->file, &pt) & poll->events;
1384
1385         /*
1386          * Note that ->ki_cancel callers also delete iocb from active_reqs after
1387          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
1388          * synchronize with them.  In the cancellation case the list_del_init
1389          * itself is not actually needed, but harmless so we keep it in to
1390          * avoid further branches in the fast path.
1391          */
1392         spin_lock_irq(&ctx->completion_lock);
1393         if (!mask && !READ_ONCE(poll->canceled)) {
1394                 add_wait_queue(poll->head, &poll->wait);
1395                 spin_unlock_irq(&ctx->completion_lock);
1396                 return;
1397         }
1398         list_del_init(&req->list);
1399         io_poll_complete(ctx, req, mask);
1400         spin_unlock_irq(&ctx->completion_lock);
1401
1402         io_cqring_ev_posted(ctx);
1403         io_put_req(req);
1404 }
1405
1406 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
1407                         void *key)
1408 {
1409         struct io_poll_iocb *poll = container_of(wait, struct io_poll_iocb,
1410                                                         wait);
1411         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
1412         struct io_ring_ctx *ctx = req->ctx;
1413         __poll_t mask = key_to_poll(key);
1414         unsigned long flags;
1415
1416         /* for instances that support it check for an event match first: */
1417         if (mask && !(mask & poll->events))
1418                 return 0;
1419
1420         list_del_init(&poll->wait.entry);
1421
1422         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
1423                 list_del(&req->list);
1424                 io_poll_complete(ctx, req, mask);
1425                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1426
1427                 io_cqring_ev_posted(ctx);
1428                 io_put_req(req);
1429         } else {
1430                 queue_work(ctx->sqo_wq, &req->work);
1431         }
1432
1433         return 1;
1434 }
1435
1436 struct io_poll_table {
1437         struct poll_table_struct pt;
1438         struct io_kiocb *req;
1439         int error;
1440 };
1441
1442 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
1443                                struct poll_table_struct *p)
1444 {
1445         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
1446
1447         if (unlikely(pt->req->poll.head)) {
1448                 pt->error = -EINVAL;
1449                 return;
1450         }
1451
1452         pt->error = 0;
1453         pt->req->poll.head = head;
1454         add_wait_queue(head, &pt->req->poll.wait);
1455 }
1456
1457 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1458 {
1459         struct io_poll_iocb *poll = &req->poll;
1460         struct io_ring_ctx *ctx = req->ctx;
1461         struct io_poll_table ipt;
1462         bool cancel = false;
1463         __poll_t mask;
1464         u16 events;
1465
1466         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
1467                 return -EINVAL;
1468         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
1469                 return -EINVAL;
1470         if (!poll->file)
1471                 return -EBADF;
1472
1473         INIT_WORK(&req->work, io_poll_complete_work);
1474         events = READ_ONCE(sqe->poll_events);
1475         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
1476
1477         poll->head = NULL;
1478         poll->done = false;
1479         poll->canceled = false;
1480
1481         ipt.pt._qproc = io_poll_queue_proc;
1482         ipt.pt._key = poll->events;
1483         ipt.req = req;
1484         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
1485
1486         /* initialized the list so that we can do list_empty checks */
1487         INIT_LIST_HEAD(&poll->wait.entry);
1488         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
1489
1490         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
1491
1492         spin_lock_irq(&ctx->completion_lock);
1493         if (likely(poll->head)) {
1494                 spin_lock(&poll->head->lock);
1495                 if (unlikely(list_empty(&poll->wait.entry))) {
1496                         if (ipt.error)
1497                                 cancel = true;
1498                         ipt.error = 0;
1499                         mask = 0;
1500                 }
1501                 if (mask || ipt.error)
1502                         list_del_init(&poll->wait.entry);
1503                 else if (cancel)
1504                         WRITE_ONCE(poll->canceled, true);
1505                 else if (!poll->done) /* actually waiting for an event */
1506                         list_add_tail(&req->list, &ctx->cancel_list);
1507                 spin_unlock(&poll->head->lock);
1508         }
1509         if (mask) { /* no async, we'd stolen it */
1510                 ipt.error = 0;
1511                 io_poll_complete(ctx, req, mask);
1512         }
1513         spin_unlock_irq(&ctx->completion_lock);
1514
1515         if (mask) {
1516                 io_cqring_ev_posted(ctx);
1517                 io_put_req(req);
1518         }
1519         return ipt.error;
1520 }
1521
1522 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
1523                         const struct io_uring_sqe *sqe)
1524 {
1525         struct io_uring_sqe *sqe_copy;
1526
1527         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list))
1528                 return 0;
1529
1530         sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1531         if (!sqe_copy)
1532                 return -EAGAIN;
1533
1534         spin_lock_irq(&ctx->completion_lock);
1535         if (!io_sequence_defer(ctx, req) && list_empty(&ctx->defer_list)) {
1536                 spin_unlock_irq(&ctx->completion_lock);
1537                 kfree(sqe_copy);
1538                 return 0;
1539         }
1540
1541         memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
1542         req->submit.sqe = sqe_copy;
1543
1544         INIT_WORK(&req->work, io_sq_wq_submit_work);
1545         list_add_tail(&req->list, &ctx->defer_list);
1546         spin_unlock_irq(&ctx->completion_lock);
1547         return -EIOCBQUEUED;
1548 }
1549
1550 static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
1551                            const struct sqe_submit *s, bool force_nonblock)
1552 {
1553         int ret, opcode;
1554
1555         if (unlikely(s->index >= ctx->sq_entries))
1556                 return -EINVAL;
1557         req->user_data = READ_ONCE(s->sqe->user_data);
1558
1559         opcode = READ_ONCE(s->sqe->opcode);
1560         switch (opcode) {
1561         case IORING_OP_NOP:
1562                 ret = io_nop(req, req->user_data);
1563                 break;
1564         case IORING_OP_READV:
1565                 if (unlikely(s->sqe->buf_index))
1566                         return -EINVAL;
1567                 ret = io_read(req, s, force_nonblock);
1568                 break;
1569         case IORING_OP_WRITEV:
1570                 if (unlikely(s->sqe->buf_index))
1571                         return -EINVAL;
1572                 ret = io_write(req, s, force_nonblock);
1573                 break;
1574         case IORING_OP_READ_FIXED:
1575                 ret = io_read(req, s, force_nonblock);
1576                 break;
1577         case IORING_OP_WRITE_FIXED:
1578                 ret = io_write(req, s, force_nonblock);
1579                 break;
1580         case IORING_OP_FSYNC:
1581                 ret = io_fsync(req, s->sqe, force_nonblock);
1582                 break;
1583         case IORING_OP_POLL_ADD:
1584                 ret = io_poll_add(req, s->sqe);
1585                 break;
1586         case IORING_OP_POLL_REMOVE:
1587                 ret = io_poll_remove(req, s->sqe);
1588                 break;
1589         case IORING_OP_SYNC_FILE_RANGE:
1590                 ret = io_sync_file_range(req, s->sqe, force_nonblock);
1591                 break;
1592         default:
1593                 ret = -EINVAL;
1594                 break;
1595         }
1596
1597         if (ret)
1598                 return ret;
1599
1600         if (ctx->flags & IORING_SETUP_IOPOLL) {
1601                 if (req->error == -EAGAIN)
1602                         return -EAGAIN;
1603
1604                 /* workqueue context doesn't hold uring_lock, grab it now */
1605                 if (s->needs_lock)
1606                         mutex_lock(&ctx->uring_lock);
1607                 io_iopoll_req_issued(req);
1608                 if (s->needs_lock)
1609                         mutex_unlock(&ctx->uring_lock);
1610         }
1611
1612         return 0;
1613 }
1614
1615 static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
1616                                                  const struct io_uring_sqe *sqe)
1617 {
1618         switch (sqe->opcode) {
1619         case IORING_OP_READV:
1620         case IORING_OP_READ_FIXED:
1621                 return &ctx->pending_async[READ];
1622         case IORING_OP_WRITEV:
1623         case IORING_OP_WRITE_FIXED:
1624                 return &ctx->pending_async[WRITE];
1625         default:
1626                 return NULL;
1627         }
1628 }
1629
1630 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
1631 {
1632         u8 opcode = READ_ONCE(sqe->opcode);
1633
1634         return !(opcode == IORING_OP_READ_FIXED ||
1635                  opcode == IORING_OP_WRITE_FIXED);
1636 }
1637
1638 static void io_sq_wq_submit_work(struct work_struct *work)
1639 {
1640         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
1641         struct io_ring_ctx *ctx = req->ctx;
1642         struct mm_struct *cur_mm = NULL;
1643         struct async_list *async_list;
1644         LIST_HEAD(req_list);
1645         mm_segment_t old_fs;
1646         int ret;
1647
1648         async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
1649 restart:
1650         do {
1651                 struct sqe_submit *s = &req->submit;
1652                 const struct io_uring_sqe *sqe = s->sqe;
1653
1654                 /* Ensure we clear previously set non-block flag */
1655                 req->rw.ki_flags &= ~IOCB_NOWAIT;
1656
1657                 ret = 0;
1658                 if (io_sqe_needs_user(sqe) && !cur_mm) {
1659                         if (!mmget_not_zero(ctx->sqo_mm)) {
1660                                 ret = -EFAULT;
1661                         } else {
1662                                 cur_mm = ctx->sqo_mm;
1663                                 use_mm(cur_mm);
1664                                 old_fs = get_fs();
1665                                 set_fs(USER_DS);
1666                         }
1667                 }
1668
1669                 if (!ret) {
1670                         s->has_user = cur_mm != NULL;
1671                         s->needs_lock = true;
1672                         do {
1673                                 ret = __io_submit_sqe(ctx, req, s, false);
1674                                 /*
1675                                  * We can get EAGAIN for polled IO even though
1676                                  * we're forcing a sync submission from here,
1677                                  * since we can't wait for request slots on the
1678                                  * block side.
1679                                  */
1680                                 if (ret != -EAGAIN)
1681                                         break;
1682                                 cond_resched();
1683                         } while (1);
1684                 }
1685
1686                 /* drop submission reference */
1687                 io_put_req(req);
1688
1689                 if (ret) {
1690                         io_cqring_add_event(ctx, sqe->user_data, ret);
1691                         io_put_req(req);
1692                 }
1693
1694                 /* async context always use a copy of the sqe */
1695                 kfree(sqe);
1696
1697                 if (!async_list)
1698                         break;
1699                 if (!list_empty(&req_list)) {
1700                         req = list_first_entry(&req_list, struct io_kiocb,
1701                                                 list);
1702                         list_del(&req->list);
1703                         continue;
1704                 }
1705                 if (list_empty(&async_list->list))
1706                         break;
1707
1708                 req = NULL;
1709                 spin_lock(&async_list->lock);
1710                 if (list_empty(&async_list->list)) {
1711                         spin_unlock(&async_list->lock);
1712                         break;
1713                 }
1714                 list_splice_init(&async_list->list, &req_list);
1715                 spin_unlock(&async_list->lock);
1716
1717                 req = list_first_entry(&req_list, struct io_kiocb, list);
1718                 list_del(&req->list);
1719         } while (req);
1720
1721         /*
1722          * Rare case of racing with a submitter. If we find the count has
1723          * dropped to zero AND we have pending work items, then restart
1724          * the processing. This is a tiny race window.
1725          */
1726         if (async_list) {
1727                 ret = atomic_dec_return(&async_list->cnt);
1728                 while (!ret && !list_empty(&async_list->list)) {
1729                         spin_lock(&async_list->lock);
1730                         atomic_inc(&async_list->cnt);
1731                         list_splice_init(&async_list->list, &req_list);
1732                         spin_unlock(&async_list->lock);
1733
1734                         if (!list_empty(&req_list)) {
1735                                 req = list_first_entry(&req_list,
1736                                                         struct io_kiocb, list);
1737                                 list_del(&req->list);
1738                                 goto restart;
1739                         }
1740                         ret = atomic_dec_return(&async_list->cnt);
1741                 }
1742         }
1743
1744         if (cur_mm) {
1745                 set_fs(old_fs);
1746                 unuse_mm(cur_mm);
1747                 mmput(cur_mm);
1748         }
1749 }
1750
1751 /*
1752  * See if we can piggy back onto previously submitted work, that is still
1753  * running. We currently only allow this if the new request is sequential
1754  * to the previous one we punted.
1755  */
1756 static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
1757 {
1758         bool ret = false;
1759
1760         if (!list)
1761                 return false;
1762         if (!(req->flags & REQ_F_SEQ_PREV))
1763                 return false;
1764         if (!atomic_read(&list->cnt))
1765                 return false;
1766
1767         ret = true;
1768         spin_lock(&list->lock);
1769         list_add_tail(&req->list, &list->list);
1770         if (!atomic_read(&list->cnt)) {
1771                 list_del_init(&req->list);
1772                 ret = false;
1773         }
1774         spin_unlock(&list->lock);
1775         return ret;
1776 }
1777
1778 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
1779 {
1780         int op = READ_ONCE(sqe->opcode);
1781
1782         switch (op) {
1783         case IORING_OP_NOP:
1784         case IORING_OP_POLL_REMOVE:
1785                 return false;
1786         default:
1787                 return true;
1788         }
1789 }
1790
1791 static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
1792                            struct io_submit_state *state, struct io_kiocb *req)
1793 {
1794         unsigned flags;
1795         int fd;
1796
1797         flags = READ_ONCE(s->sqe->flags);
1798         fd = READ_ONCE(s->sqe->fd);
1799
1800         if (flags & IOSQE_IO_DRAIN) {
1801                 req->flags |= REQ_F_IO_DRAIN;
1802                 req->sequence = ctx->cached_sq_head - 1;
1803         }
1804
1805         if (!io_op_needs_file(s->sqe))
1806                 return 0;
1807
1808         if (flags & IOSQE_FIXED_FILE) {
1809                 if (unlikely(!ctx->user_files ||
1810                     (unsigned) fd >= ctx->nr_user_files))
1811                         return -EBADF;
1812                 req->file = ctx->user_files[fd];
1813                 req->flags |= REQ_F_FIXED_FILE;
1814         } else {
1815                 if (s->needs_fixed_file)
1816                         return -EBADF;
1817                 req->file = io_file_get(state, fd);
1818                 if (unlikely(!req->file))
1819                         return -EBADF;
1820         }
1821
1822         return 0;
1823 }
1824
1825 static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
1826                          struct io_submit_state *state)
1827 {
1828         struct io_kiocb *req;
1829         int ret;
1830
1831         /* enforce forwards compatibility on users */
1832         if (unlikely(s->sqe->flags & ~(IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)))
1833                 return -EINVAL;
1834
1835         req = io_get_req(ctx, state);
1836         if (unlikely(!req))
1837                 return -EAGAIN;
1838
1839         ret = io_req_set_file(ctx, s, state, req);
1840         if (unlikely(ret))
1841                 goto out;
1842
1843         ret = io_req_defer(ctx, req, s->sqe);
1844         if (ret) {
1845                 if (ret == -EIOCBQUEUED)
1846                         ret = 0;
1847                 return ret;
1848         }
1849
1850         ret = __io_submit_sqe(ctx, req, s, true);
1851         if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
1852                 struct io_uring_sqe *sqe_copy;
1853
1854                 sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
1855                 if (sqe_copy) {
1856                         struct async_list *list;
1857
1858                         memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
1859                         s->sqe = sqe_copy;
1860
1861                         memcpy(&req->submit, s, sizeof(*s));
1862                         list = io_async_list_from_sqe(ctx, s->sqe);
1863                         if (!io_add_to_prev_work(list, req)) {
1864                                 if (list)
1865                                         atomic_inc(&list->cnt);
1866                                 INIT_WORK(&req->work, io_sq_wq_submit_work);
1867                                 queue_work(ctx->sqo_wq, &req->work);
1868                         }
1869
1870                         /*
1871                          * Queued up for async execution, worker will release
1872                          * submit reference when the iocb is actually
1873                          * submitted.
1874                          */
1875                         return 0;
1876                 }
1877         }
1878
1879 out:
1880         /* drop submission reference */
1881         io_put_req(req);
1882
1883         /* and drop final reference, if we failed */
1884         if (ret)
1885                 io_put_req(req);
1886
1887         return ret;
1888 }
1889
1890 /*
1891  * Batched submission is done, ensure local IO is flushed out.
1892  */
1893 static void io_submit_state_end(struct io_submit_state *state)
1894 {
1895         blk_finish_plug(&state->plug);
1896         io_file_put(state);
1897         if (state->free_reqs)
1898                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
1899                                         &state->reqs[state->cur_req]);
1900 }
1901
1902 /*
1903  * Start submission side cache.
1904  */
1905 static void io_submit_state_start(struct io_submit_state *state,
1906                                   struct io_ring_ctx *ctx, unsigned max_ios)
1907 {
1908         blk_start_plug(&state->plug);
1909         state->free_reqs = 0;
1910         state->file = NULL;
1911         state->ios_left = max_ios;
1912 }
1913
1914 static void io_commit_sqring(struct io_ring_ctx *ctx)
1915 {
1916         struct io_sq_ring *ring = ctx->sq_ring;
1917
1918         if (ctx->cached_sq_head != READ_ONCE(ring->r.head)) {
1919                 /*
1920                  * Ensure any loads from the SQEs are done at this point,
1921                  * since once we write the new head, the application could
1922                  * write new data to them.
1923                  */
1924                 smp_store_release(&ring->r.head, ctx->cached_sq_head);
1925         }
1926 }
1927
1928 /*
1929  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
1930  * that is mapped by userspace. This means that care needs to be taken to
1931  * ensure that reads are stable, as we cannot rely on userspace always
1932  * being a good citizen. If members of the sqe are validated and then later
1933  * used, it's important that those reads are done through READ_ONCE() to
1934  * prevent a re-load down the line.
1935  */
1936 static bool io_get_sqring(struct io_ring_ctx *ctx, struct sqe_submit *s)
1937 {
1938         struct io_sq_ring *ring = ctx->sq_ring;
1939         unsigned head;
1940
1941         /*
1942          * The cached sq head (or cq tail) serves two purposes:
1943          *
1944          * 1) allows us to batch the cost of updating the user visible
1945          *    head updates.
1946          * 2) allows the kernel side to track the head on its own, even
1947          *    though the application is the one updating it.
1948          */
1949         head = ctx->cached_sq_head;
1950         /* make sure SQ entry isn't read before tail */
1951         if (head == smp_load_acquire(&ring->r.tail))
1952                 return false;
1953
1954         head = READ_ONCE(ring->array[head & ctx->sq_mask]);
1955         if (head < ctx->sq_entries) {
1956                 s->index = head;
1957                 s->sqe = &ctx->sq_sqes[head];
1958                 ctx->cached_sq_head++;
1959                 return true;
1960         }
1961
1962         /* drop invalid entries */
1963         ctx->cached_sq_head++;
1964         ring->dropped++;
1965         return false;
1966 }
1967
1968 static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
1969                           unsigned int nr, bool has_user, bool mm_fault)
1970 {
1971         struct io_submit_state state, *statep = NULL;
1972         int ret, i, submitted = 0;
1973
1974         if (nr > IO_PLUG_THRESHOLD) {
1975                 io_submit_state_start(&state, ctx, nr);
1976                 statep = &state;
1977         }
1978
1979         for (i = 0; i < nr; i++) {
1980                 if (unlikely(mm_fault)) {
1981                         ret = -EFAULT;
1982                 } else {
1983                         sqes[i].has_user = has_user;
1984                         sqes[i].needs_lock = true;
1985                         sqes[i].needs_fixed_file = true;
1986                         ret = io_submit_sqe(ctx, &sqes[i], statep);
1987                 }
1988                 if (!ret) {
1989                         submitted++;
1990                         continue;
1991                 }
1992
1993                 io_cqring_add_event(ctx, sqes[i].sqe->user_data, ret);
1994         }
1995
1996         if (statep)
1997                 io_submit_state_end(&state);
1998
1999         return submitted;
2000 }
2001
2002 static int io_sq_thread(void *data)
2003 {
2004         struct sqe_submit sqes[IO_IOPOLL_BATCH];
2005         struct io_ring_ctx *ctx = data;
2006         struct mm_struct *cur_mm = NULL;
2007         mm_segment_t old_fs;
2008         DEFINE_WAIT(wait);
2009         unsigned inflight;
2010         unsigned long timeout;
2011
2012         old_fs = get_fs();
2013         set_fs(USER_DS);
2014
2015         timeout = inflight = 0;
2016         while (!kthread_should_park()) {
2017                 bool all_fixed, mm_fault = false;
2018                 int i;
2019
2020                 if (inflight) {
2021                         unsigned nr_events = 0;
2022
2023                         if (ctx->flags & IORING_SETUP_IOPOLL) {
2024                                 /*
2025                                  * We disallow the app entering submit/complete
2026                                  * with polling, but we still need to lock the
2027                                  * ring to prevent racing with polled issue
2028                                  * that got punted to a workqueue.
2029                                  */
2030                                 mutex_lock(&ctx->uring_lock);
2031                                 io_iopoll_check(ctx, &nr_events, 0);
2032                                 mutex_unlock(&ctx->uring_lock);
2033                         } else {
2034                                 /*
2035                                  * Normal IO, just pretend everything completed.
2036                                  * We don't have to poll completions for that.
2037                                  */
2038                                 nr_events = inflight;
2039                         }
2040
2041                         inflight -= nr_events;
2042                         if (!inflight)
2043                                 timeout = jiffies + ctx->sq_thread_idle;
2044                 }
2045
2046                 if (!io_get_sqring(ctx, &sqes[0])) {
2047                         /*
2048                          * We're polling. If we're within the defined idle
2049                          * period, then let us spin without work before going
2050                          * to sleep.
2051                          */
2052                         if (inflight || !time_after(jiffies, timeout)) {
2053                                 cpu_relax();
2054                                 continue;
2055                         }
2056
2057                         /*
2058                          * Drop cur_mm before scheduling, we can't hold it for
2059                          * long periods (or over schedule()). Do this before
2060                          * adding ourselves to the waitqueue, as the unuse/drop
2061                          * may sleep.
2062                          */
2063                         if (cur_mm) {
2064                                 unuse_mm(cur_mm);
2065                                 mmput(cur_mm);
2066                                 cur_mm = NULL;
2067                         }
2068
2069                         prepare_to_wait(&ctx->sqo_wait, &wait,
2070                                                 TASK_INTERRUPTIBLE);
2071
2072                         /* Tell userspace we may need a wakeup call */
2073                         ctx->sq_ring->flags |= IORING_SQ_NEED_WAKEUP;
2074                         /* make sure to read SQ tail after writing flags */
2075                         smp_mb();
2076
2077                         if (!io_get_sqring(ctx, &sqes[0])) {
2078                                 if (kthread_should_park()) {
2079                                         finish_wait(&ctx->sqo_wait, &wait);
2080                                         break;
2081                                 }
2082                                 if (signal_pending(current))
2083                                         flush_signals(current);
2084                                 schedule();
2085                                 finish_wait(&ctx->sqo_wait, &wait);
2086
2087                                 ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2088                                 continue;
2089                         }
2090                         finish_wait(&ctx->sqo_wait, &wait);
2091
2092                         ctx->sq_ring->flags &= ~IORING_SQ_NEED_WAKEUP;
2093                 }
2094
2095                 i = 0;
2096                 all_fixed = true;
2097                 do {
2098                         if (all_fixed && io_sqe_needs_user(sqes[i].sqe))
2099                                 all_fixed = false;
2100
2101                         i++;
2102                         if (i == ARRAY_SIZE(sqes))
2103                                 break;
2104                 } while (io_get_sqring(ctx, &sqes[i]));
2105
2106                 /* Unless all new commands are FIXED regions, grab mm */
2107                 if (!all_fixed && !cur_mm) {
2108                         mm_fault = !mmget_not_zero(ctx->sqo_mm);
2109                         if (!mm_fault) {
2110                                 use_mm(ctx->sqo_mm);
2111                                 cur_mm = ctx->sqo_mm;
2112                         }
2113                 }
2114
2115                 inflight += io_submit_sqes(ctx, sqes, i, cur_mm != NULL,
2116                                                 mm_fault);
2117
2118                 /* Commit SQ ring head once we've consumed all SQEs */
2119                 io_commit_sqring(ctx);
2120         }
2121
2122         set_fs(old_fs);
2123         if (cur_mm) {
2124                 unuse_mm(cur_mm);
2125                 mmput(cur_mm);
2126         }
2127
2128         kthread_parkme();
2129
2130         return 0;
2131 }
2132
2133 static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
2134 {
2135         struct io_submit_state state, *statep = NULL;
2136         int i, submit = 0;
2137
2138         if (to_submit > IO_PLUG_THRESHOLD) {
2139                 io_submit_state_start(&state, ctx, to_submit);
2140                 statep = &state;
2141         }
2142
2143         for (i = 0; i < to_submit; i++) {
2144                 struct sqe_submit s;
2145                 int ret;
2146
2147                 if (!io_get_sqring(ctx, &s))
2148                         break;
2149
2150                 s.has_user = true;
2151                 s.needs_lock = false;
2152                 s.needs_fixed_file = false;
2153                 submit++;
2154
2155                 ret = io_submit_sqe(ctx, &s, statep);
2156                 if (ret)
2157                         io_cqring_add_event(ctx, s.sqe->user_data, ret);
2158         }
2159         io_commit_sqring(ctx);
2160
2161         if (statep)
2162                 io_submit_state_end(statep);
2163
2164         return submit;
2165 }
2166
2167 static unsigned io_cqring_events(struct io_cq_ring *ring)
2168 {
2169         /* See comment at the top of this file */
2170         smp_rmb();
2171         return READ_ONCE(ring->r.tail) - READ_ONCE(ring->r.head);
2172 }
2173
2174 /*
2175  * Wait until events become available, if we don't already have some. The
2176  * application must reap them itself, as they reside on the shared cq ring.
2177  */
2178 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2179                           const sigset_t __user *sig, size_t sigsz)
2180 {
2181         struct io_cq_ring *ring = ctx->cq_ring;
2182         sigset_t ksigmask, sigsaved;
2183         int ret;
2184
2185         if (io_cqring_events(ring) >= min_events)
2186                 return 0;
2187
2188         if (sig) {
2189 #ifdef CONFIG_COMPAT
2190                 if (in_compat_syscall())
2191                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
2192                                                       &ksigmask, &sigsaved, sigsz);
2193                 else
2194 #endif
2195                         ret = set_user_sigmask(sig, &ksigmask,
2196                                                &sigsaved, sigsz);
2197
2198                 if (ret)
2199                         return ret;
2200         }
2201
2202         ret = wait_event_interruptible(ctx->wait, io_cqring_events(ring) >= min_events);
2203
2204         if (sig)
2205                 restore_user_sigmask(sig, &sigsaved, ret == -ERESTARTSYS);
2206
2207         if (ret == -ERESTARTSYS)
2208                 ret = -EINTR;
2209
2210         return READ_ONCE(ring->r.head) == READ_ONCE(ring->r.tail) ? ret : 0;
2211 }
2212
2213 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
2214 {
2215 #if defined(CONFIG_UNIX)
2216         if (ctx->ring_sock) {
2217                 struct sock *sock = ctx->ring_sock->sk;
2218                 struct sk_buff *skb;
2219
2220                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
2221                         kfree_skb(skb);
2222         }
2223 #else
2224         int i;
2225
2226         for (i = 0; i < ctx->nr_user_files; i++)
2227                 fput(ctx->user_files[i]);
2228 #endif
2229 }
2230
2231 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
2232 {
2233         if (!ctx->user_files)
2234                 return -ENXIO;
2235
2236         __io_sqe_files_unregister(ctx);
2237         kfree(ctx->user_files);
2238         ctx->user_files = NULL;
2239         ctx->nr_user_files = 0;
2240         return 0;
2241 }
2242
2243 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
2244 {
2245         if (ctx->sqo_thread) {
2246                 /*
2247                  * The park is a bit of a work-around, without it we get
2248                  * warning spews on shutdown with SQPOLL set and affinity
2249                  * set to a single CPU.
2250                  */
2251                 kthread_park(ctx->sqo_thread);
2252                 kthread_stop(ctx->sqo_thread);
2253                 ctx->sqo_thread = NULL;
2254         }
2255 }
2256
2257 static void io_finish_async(struct io_ring_ctx *ctx)
2258 {
2259         io_sq_thread_stop(ctx);
2260
2261         if (ctx->sqo_wq) {
2262                 destroy_workqueue(ctx->sqo_wq);
2263                 ctx->sqo_wq = NULL;
2264         }
2265 }
2266
2267 #if defined(CONFIG_UNIX)
2268 static void io_destruct_skb(struct sk_buff *skb)
2269 {
2270         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
2271
2272         io_finish_async(ctx);
2273         unix_destruct_scm(skb);
2274 }
2275
2276 /*
2277  * Ensure the UNIX gc is aware of our file set, so we are certain that
2278  * the io_uring can be safely unregistered on process exit, even if we have
2279  * loops in the file referencing.
2280  */
2281 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
2282 {
2283         struct sock *sk = ctx->ring_sock->sk;
2284         struct scm_fp_list *fpl;
2285         struct sk_buff *skb;
2286         int i;
2287
2288         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
2289                 unsigned long inflight = ctx->user->unix_inflight + nr;
2290
2291                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
2292                         return -EMFILE;
2293         }
2294
2295         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
2296         if (!fpl)
2297                 return -ENOMEM;
2298
2299         skb = alloc_skb(0, GFP_KERNEL);
2300         if (!skb) {
2301                 kfree(fpl);
2302                 return -ENOMEM;
2303         }
2304
2305         skb->sk = sk;
2306         skb->destructor = io_destruct_skb;
2307
2308         fpl->user = get_uid(ctx->user);
2309         for (i = 0; i < nr; i++) {
2310                 fpl->fp[i] = get_file(ctx->user_files[i + offset]);
2311                 unix_inflight(fpl->user, fpl->fp[i]);
2312         }
2313
2314         fpl->max = fpl->count = nr;
2315         UNIXCB(skb).fp = fpl;
2316         refcount_add(skb->truesize, &sk->sk_wmem_alloc);
2317         skb_queue_head(&sk->sk_receive_queue, skb);
2318
2319         for (i = 0; i < nr; i++)
2320                 fput(fpl->fp[i]);
2321
2322         return 0;
2323 }
2324
2325 /*
2326  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
2327  * causes regular reference counting to break down. We rely on the UNIX
2328  * garbage collection to take care of this problem for us.
2329  */
2330 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2331 {
2332         unsigned left, total;
2333         int ret = 0;
2334
2335         total = 0;
2336         left = ctx->nr_user_files;
2337         while (left) {
2338                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
2339
2340                 ret = __io_sqe_files_scm(ctx, this_files, total);
2341                 if (ret)
2342                         break;
2343                 left -= this_files;
2344                 total += this_files;
2345         }
2346
2347         if (!ret)
2348                 return 0;
2349
2350         while (total < ctx->nr_user_files) {
2351                 fput(ctx->user_files[total]);
2352                 total++;
2353         }
2354
2355         return ret;
2356 }
2357 #else
2358 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
2359 {
2360         return 0;
2361 }
2362 #endif
2363
2364 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
2365                                  unsigned nr_args)
2366 {
2367         __s32 __user *fds = (__s32 __user *) arg;
2368         int fd, ret = 0;
2369         unsigned i;
2370
2371         if (ctx->user_files)
2372                 return -EBUSY;
2373         if (!nr_args)
2374                 return -EINVAL;
2375         if (nr_args > IORING_MAX_FIXED_FILES)
2376                 return -EMFILE;
2377
2378         ctx->user_files = kcalloc(nr_args, sizeof(struct file *), GFP_KERNEL);
2379         if (!ctx->user_files)
2380                 return -ENOMEM;
2381
2382         for (i = 0; i < nr_args; i++) {
2383                 ret = -EFAULT;
2384                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
2385                         break;
2386
2387                 ctx->user_files[i] = fget(fd);
2388
2389                 ret = -EBADF;
2390                 if (!ctx->user_files[i])
2391                         break;
2392                 /*
2393                  * Don't allow io_uring instances to be registered. If UNIX
2394                  * isn't enabled, then this causes a reference cycle and this
2395                  * instance can never get freed. If UNIX is enabled we'll
2396                  * handle it just fine, but there's still no point in allowing
2397                  * a ring fd as it doesn't support regular read/write anyway.
2398                  */
2399                 if (ctx->user_files[i]->f_op == &io_uring_fops) {
2400                         fput(ctx->user_files[i]);
2401                         break;
2402                 }
2403                 ctx->nr_user_files++;
2404                 ret = 0;
2405         }
2406
2407         if (ret) {
2408                 for (i = 0; i < ctx->nr_user_files; i++)
2409                         fput(ctx->user_files[i]);
2410
2411                 kfree(ctx->user_files);
2412                 ctx->user_files = NULL;
2413                 ctx->nr_user_files = 0;
2414                 return ret;
2415         }
2416
2417         ret = io_sqe_files_scm(ctx);
2418         if (ret)
2419                 io_sqe_files_unregister(ctx);
2420
2421         return ret;
2422 }
2423
2424 static int io_sq_offload_start(struct io_ring_ctx *ctx,
2425                                struct io_uring_params *p)
2426 {
2427         int ret;
2428
2429         init_waitqueue_head(&ctx->sqo_wait);
2430         mmgrab(current->mm);
2431         ctx->sqo_mm = current->mm;
2432
2433         if (ctx->flags & IORING_SETUP_SQPOLL) {
2434                 ret = -EPERM;
2435                 if (!capable(CAP_SYS_ADMIN))
2436                         goto err;
2437
2438                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
2439                 if (!ctx->sq_thread_idle)
2440                         ctx->sq_thread_idle = HZ;
2441
2442                 if (p->flags & IORING_SETUP_SQ_AFF) {
2443                         int cpu = p->sq_thread_cpu;
2444
2445                         ret = -EINVAL;
2446                         if (cpu >= nr_cpu_ids)
2447                                 goto err;
2448                         if (!cpu_online(cpu))
2449                                 goto err;
2450
2451                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
2452                                                         ctx, cpu,
2453                                                         "io_uring-sq");
2454                 } else {
2455                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
2456                                                         "io_uring-sq");
2457                 }
2458                 if (IS_ERR(ctx->sqo_thread)) {
2459                         ret = PTR_ERR(ctx->sqo_thread);
2460                         ctx->sqo_thread = NULL;
2461                         goto err;
2462                 }
2463                 wake_up_process(ctx->sqo_thread);
2464         } else if (p->flags & IORING_SETUP_SQ_AFF) {
2465                 /* Can't have SQ_AFF without SQPOLL */
2466                 ret = -EINVAL;
2467                 goto err;
2468         }
2469
2470         /* Do QD, or 2 * CPUS, whatever is smallest */
2471         ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
2472                         min(ctx->sq_entries - 1, 2 * num_online_cpus()));
2473         if (!ctx->sqo_wq) {
2474                 ret = -ENOMEM;
2475                 goto err;
2476         }
2477
2478         return 0;
2479 err:
2480         io_sq_thread_stop(ctx);
2481         mmdrop(ctx->sqo_mm);
2482         ctx->sqo_mm = NULL;
2483         return ret;
2484 }
2485
2486 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
2487 {
2488         atomic_long_sub(nr_pages, &user->locked_vm);
2489 }
2490
2491 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
2492 {
2493         unsigned long page_limit, cur_pages, new_pages;
2494
2495         /* Don't allow more pages than we can safely lock */
2496         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
2497
2498         do {
2499                 cur_pages = atomic_long_read(&user->locked_vm);
2500                 new_pages = cur_pages + nr_pages;
2501                 if (new_pages > page_limit)
2502                         return -ENOMEM;
2503         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
2504                                         new_pages) != cur_pages);
2505
2506         return 0;
2507 }
2508
2509 static void io_mem_free(void *ptr)
2510 {
2511         struct page *page;
2512
2513         if (!ptr)
2514                 return;
2515
2516         page = virt_to_head_page(ptr);
2517         if (put_page_testzero(page))
2518                 free_compound_page(page);
2519 }
2520
2521 static void *io_mem_alloc(size_t size)
2522 {
2523         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
2524                                 __GFP_NORETRY;
2525
2526         return (void *) __get_free_pages(gfp_flags, get_order(size));
2527 }
2528
2529 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
2530 {
2531         struct io_sq_ring *sq_ring;
2532         struct io_cq_ring *cq_ring;
2533         size_t bytes;
2534
2535         bytes = struct_size(sq_ring, array, sq_entries);
2536         bytes += array_size(sizeof(struct io_uring_sqe), sq_entries);
2537         bytes += struct_size(cq_ring, cqes, cq_entries);
2538
2539         return (bytes + PAGE_SIZE - 1) / PAGE_SIZE;
2540 }
2541
2542 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
2543 {
2544         int i, j;
2545
2546         if (!ctx->user_bufs)
2547                 return -ENXIO;
2548
2549         for (i = 0; i < ctx->nr_user_bufs; i++) {
2550                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2551
2552                 for (j = 0; j < imu->nr_bvecs; j++)
2553                         put_page(imu->bvec[j].bv_page);
2554
2555                 if (ctx->account_mem)
2556                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
2557                 kvfree(imu->bvec);
2558                 imu->nr_bvecs = 0;
2559         }
2560
2561         kfree(ctx->user_bufs);
2562         ctx->user_bufs = NULL;
2563         ctx->nr_user_bufs = 0;
2564         return 0;
2565 }
2566
2567 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
2568                        void __user *arg, unsigned index)
2569 {
2570         struct iovec __user *src;
2571
2572 #ifdef CONFIG_COMPAT
2573         if (ctx->compat) {
2574                 struct compat_iovec __user *ciovs;
2575                 struct compat_iovec ciov;
2576
2577                 ciovs = (struct compat_iovec __user *) arg;
2578                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
2579                         return -EFAULT;
2580
2581                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
2582                 dst->iov_len = ciov.iov_len;
2583                 return 0;
2584         }
2585 #endif
2586         src = (struct iovec __user *) arg;
2587         if (copy_from_user(dst, &src[index], sizeof(*dst)))
2588                 return -EFAULT;
2589         return 0;
2590 }
2591
2592 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
2593                                   unsigned nr_args)
2594 {
2595         struct vm_area_struct **vmas = NULL;
2596         struct page **pages = NULL;
2597         int i, j, got_pages = 0;
2598         int ret = -EINVAL;
2599
2600         if (ctx->user_bufs)
2601                 return -EBUSY;
2602         if (!nr_args || nr_args > UIO_MAXIOV)
2603                 return -EINVAL;
2604
2605         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
2606                                         GFP_KERNEL);
2607         if (!ctx->user_bufs)
2608                 return -ENOMEM;
2609
2610         for (i = 0; i < nr_args; i++) {
2611                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
2612                 unsigned long off, start, end, ubuf;
2613                 int pret, nr_pages;
2614                 struct iovec iov;
2615                 size_t size;
2616
2617                 ret = io_copy_iov(ctx, &iov, arg, i);
2618                 if (ret)
2619                         goto err;
2620
2621                 /*
2622                  * Don't impose further limits on the size and buffer
2623                  * constraints here, we'll -EINVAL later when IO is
2624                  * submitted if they are wrong.
2625                  */
2626                 ret = -EFAULT;
2627                 if (!iov.iov_base || !iov.iov_len)
2628                         goto err;
2629
2630                 /* arbitrary limit, but we need something */
2631                 if (iov.iov_len > SZ_1G)
2632                         goto err;
2633
2634                 ubuf = (unsigned long) iov.iov_base;
2635                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
2636                 start = ubuf >> PAGE_SHIFT;
2637                 nr_pages = end - start;
2638
2639                 if (ctx->account_mem) {
2640                         ret = io_account_mem(ctx->user, nr_pages);
2641                         if (ret)
2642                                 goto err;
2643                 }
2644
2645                 ret = 0;
2646                 if (!pages || nr_pages > got_pages) {
2647                         kfree(vmas);
2648                         kfree(pages);
2649                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
2650                                                 GFP_KERNEL);
2651                         vmas = kvmalloc_array(nr_pages,
2652                                         sizeof(struct vm_area_struct *),
2653                                         GFP_KERNEL);
2654                         if (!pages || !vmas) {
2655                                 ret = -ENOMEM;
2656                                 if (ctx->account_mem)
2657                                         io_unaccount_mem(ctx->user, nr_pages);
2658                                 goto err;
2659                         }
2660                         got_pages = nr_pages;
2661                 }
2662
2663                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
2664                                                 GFP_KERNEL);
2665                 ret = -ENOMEM;
2666                 if (!imu->bvec) {
2667                         if (ctx->account_mem)
2668                                 io_unaccount_mem(ctx->user, nr_pages);
2669                         goto err;
2670                 }
2671
2672                 ret = 0;
2673                 down_read(&current->mm->mmap_sem);
2674                 pret = get_user_pages(ubuf, nr_pages,
2675                                       FOLL_WRITE | FOLL_LONGTERM,
2676                                       pages, vmas);
2677                 if (pret == nr_pages) {
2678                         /* don't support file backed memory */
2679                         for (j = 0; j < nr_pages; j++) {
2680                                 struct vm_area_struct *vma = vmas[j];
2681
2682                                 if (vma->vm_file &&
2683                                     !is_file_hugepages(vma->vm_file)) {
2684                                         ret = -EOPNOTSUPP;
2685                                         break;
2686                                 }
2687                         }
2688                 } else {
2689                         ret = pret < 0 ? pret : -EFAULT;
2690                 }
2691                 up_read(&current->mm->mmap_sem);
2692                 if (ret) {
2693                         /*
2694                          * if we did partial map, or found file backed vmas,
2695                          * release any pages we did get
2696                          */
2697                         if (pret > 0) {
2698                                 for (j = 0; j < pret; j++)
2699                                         put_page(pages[j]);
2700                         }
2701                         if (ctx->account_mem)
2702                                 io_unaccount_mem(ctx->user, nr_pages);
2703                         kvfree(imu->bvec);
2704                         goto err;
2705                 }
2706
2707                 off = ubuf & ~PAGE_MASK;
2708                 size = iov.iov_len;
2709                 for (j = 0; j < nr_pages; j++) {
2710                         size_t vec_len;
2711
2712                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
2713                         imu->bvec[j].bv_page = pages[j];
2714                         imu->bvec[j].bv_len = vec_len;
2715                         imu->bvec[j].bv_offset = off;
2716                         off = 0;
2717                         size -= vec_len;
2718                 }
2719                 /* store original address for later verification */
2720                 imu->ubuf = ubuf;
2721                 imu->len = iov.iov_len;
2722                 imu->nr_bvecs = nr_pages;
2723
2724                 ctx->nr_user_bufs++;
2725         }
2726         kvfree(pages);
2727         kvfree(vmas);
2728         return 0;
2729 err:
2730         kvfree(pages);
2731         kvfree(vmas);
2732         io_sqe_buffer_unregister(ctx);
2733         return ret;
2734 }
2735
2736 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
2737 {
2738         __s32 __user *fds = arg;
2739         int fd;
2740
2741         if (ctx->cq_ev_fd)
2742                 return -EBUSY;
2743
2744         if (copy_from_user(&fd, fds, sizeof(*fds)))
2745                 return -EFAULT;
2746
2747         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
2748         if (IS_ERR(ctx->cq_ev_fd)) {
2749                 int ret = PTR_ERR(ctx->cq_ev_fd);
2750                 ctx->cq_ev_fd = NULL;
2751                 return ret;
2752         }
2753
2754         return 0;
2755 }
2756
2757 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
2758 {
2759         if (ctx->cq_ev_fd) {
2760                 eventfd_ctx_put(ctx->cq_ev_fd);
2761                 ctx->cq_ev_fd = NULL;
2762                 return 0;
2763         }
2764
2765         return -ENXIO;
2766 }
2767
2768 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
2769 {
2770         io_finish_async(ctx);
2771         if (ctx->sqo_mm)
2772                 mmdrop(ctx->sqo_mm);
2773
2774         io_iopoll_reap_events(ctx);
2775         io_sqe_buffer_unregister(ctx);
2776         io_sqe_files_unregister(ctx);
2777         io_eventfd_unregister(ctx);
2778
2779 #if defined(CONFIG_UNIX)
2780         if (ctx->ring_sock) {
2781                 ctx->ring_sock->file = NULL; /* so that iput() is called */
2782                 sock_release(ctx->ring_sock);
2783         }
2784 #endif
2785
2786         io_mem_free(ctx->sq_ring);
2787         io_mem_free(ctx->sq_sqes);
2788         io_mem_free(ctx->cq_ring);
2789
2790         percpu_ref_exit(&ctx->refs);
2791         if (ctx->account_mem)
2792                 io_unaccount_mem(ctx->user,
2793                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
2794         free_uid(ctx->user);
2795         kfree(ctx);
2796 }
2797
2798 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
2799 {
2800         struct io_ring_ctx *ctx = file->private_data;
2801         __poll_t mask = 0;
2802
2803         poll_wait(file, &ctx->cq_wait, wait);
2804         /*
2805          * synchronizes with barrier from wq_has_sleeper call in
2806          * io_commit_cqring
2807          */
2808         smp_rmb();
2809         if (READ_ONCE(ctx->sq_ring->r.tail) - ctx->cached_sq_head !=
2810             ctx->sq_ring->ring_entries)
2811                 mask |= EPOLLOUT | EPOLLWRNORM;
2812         if (READ_ONCE(ctx->cq_ring->r.head) != ctx->cached_cq_tail)
2813                 mask |= EPOLLIN | EPOLLRDNORM;
2814
2815         return mask;
2816 }
2817
2818 static int io_uring_fasync(int fd, struct file *file, int on)
2819 {
2820         struct io_ring_ctx *ctx = file->private_data;
2821
2822         return fasync_helper(fd, file, on, &ctx->cq_fasync);
2823 }
2824
2825 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
2826 {
2827         mutex_lock(&ctx->uring_lock);
2828         percpu_ref_kill(&ctx->refs);
2829         mutex_unlock(&ctx->uring_lock);
2830
2831         io_poll_remove_all(ctx);
2832         io_iopoll_reap_events(ctx);
2833         wait_for_completion(&ctx->ctx_done);
2834         io_ring_ctx_free(ctx);
2835 }
2836
2837 static int io_uring_release(struct inode *inode, struct file *file)
2838 {
2839         struct io_ring_ctx *ctx = file->private_data;
2840
2841         file->private_data = NULL;
2842         io_ring_ctx_wait_and_kill(ctx);
2843         return 0;
2844 }
2845
2846 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
2847 {
2848         loff_t offset = (loff_t) vma->vm_pgoff << PAGE_SHIFT;
2849         unsigned long sz = vma->vm_end - vma->vm_start;
2850         struct io_ring_ctx *ctx = file->private_data;
2851         unsigned long pfn;
2852         struct page *page;
2853         void *ptr;
2854
2855         switch (offset) {
2856         case IORING_OFF_SQ_RING:
2857                 ptr = ctx->sq_ring;
2858                 break;
2859         case IORING_OFF_SQES:
2860                 ptr = ctx->sq_sqes;
2861                 break;
2862         case IORING_OFF_CQ_RING:
2863                 ptr = ctx->cq_ring;
2864                 break;
2865         default:
2866                 return -EINVAL;
2867         }
2868
2869         page = virt_to_head_page(ptr);
2870         if (sz > (PAGE_SIZE << compound_order(page)))
2871                 return -EINVAL;
2872
2873         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
2874         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
2875 }
2876
2877 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
2878                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
2879                 size_t, sigsz)
2880 {
2881         struct io_ring_ctx *ctx;
2882         long ret = -EBADF;
2883         int submitted = 0;
2884         struct fd f;
2885
2886         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
2887                 return -EINVAL;
2888
2889         f = fdget(fd);
2890         if (!f.file)
2891                 return -EBADF;
2892
2893         ret = -EOPNOTSUPP;
2894         if (f.file->f_op != &io_uring_fops)
2895                 goto out_fput;
2896
2897         ret = -ENXIO;
2898         ctx = f.file->private_data;
2899         if (!percpu_ref_tryget(&ctx->refs))
2900                 goto out_fput;
2901
2902         /*
2903          * For SQ polling, the thread will do all submissions and completions.
2904          * Just return the requested submit count, and wake the thread if
2905          * we were asked to.
2906          */
2907         if (ctx->flags & IORING_SETUP_SQPOLL) {
2908                 if (flags & IORING_ENTER_SQ_WAKEUP)
2909                         wake_up(&ctx->sqo_wait);
2910                 submitted = to_submit;
2911                 goto out_ctx;
2912         }
2913
2914         ret = 0;
2915         if (to_submit) {
2916                 to_submit = min(to_submit, ctx->sq_entries);
2917
2918                 mutex_lock(&ctx->uring_lock);
2919                 submitted = io_ring_submit(ctx, to_submit);
2920                 mutex_unlock(&ctx->uring_lock);
2921         }
2922         if (flags & IORING_ENTER_GETEVENTS) {
2923                 unsigned nr_events = 0;
2924
2925                 min_complete = min(min_complete, ctx->cq_entries);
2926
2927                 if (ctx->flags & IORING_SETUP_IOPOLL) {
2928                         mutex_lock(&ctx->uring_lock);
2929                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
2930                         mutex_unlock(&ctx->uring_lock);
2931                 } else {
2932                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
2933                 }
2934         }
2935
2936 out_ctx:
2937         io_ring_drop_ctx_refs(ctx, 1);
2938 out_fput:
2939         fdput(f);
2940         return submitted ? submitted : ret;
2941 }
2942
2943 static const struct file_operations io_uring_fops = {
2944         .release        = io_uring_release,
2945         .mmap           = io_uring_mmap,
2946         .poll           = io_uring_poll,
2947         .fasync         = io_uring_fasync,
2948 };
2949
2950 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
2951                                   struct io_uring_params *p)
2952 {
2953         struct io_sq_ring *sq_ring;
2954         struct io_cq_ring *cq_ring;
2955         size_t size;
2956
2957         sq_ring = io_mem_alloc(struct_size(sq_ring, array, p->sq_entries));
2958         if (!sq_ring)
2959                 return -ENOMEM;
2960
2961         ctx->sq_ring = sq_ring;
2962         sq_ring->ring_mask = p->sq_entries - 1;
2963         sq_ring->ring_entries = p->sq_entries;
2964         ctx->sq_mask = sq_ring->ring_mask;
2965         ctx->sq_entries = sq_ring->ring_entries;
2966
2967         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
2968         if (size == SIZE_MAX)
2969                 return -EOVERFLOW;
2970
2971         ctx->sq_sqes = io_mem_alloc(size);
2972         if (!ctx->sq_sqes)
2973                 return -ENOMEM;
2974
2975         cq_ring = io_mem_alloc(struct_size(cq_ring, cqes, p->cq_entries));
2976         if (!cq_ring)
2977                 return -ENOMEM;
2978
2979         ctx->cq_ring = cq_ring;
2980         cq_ring->ring_mask = p->cq_entries - 1;
2981         cq_ring->ring_entries = p->cq_entries;
2982         ctx->cq_mask = cq_ring->ring_mask;
2983         ctx->cq_entries = cq_ring->ring_entries;
2984         return 0;
2985 }
2986
2987 /*
2988  * Allocate an anonymous fd, this is what constitutes the application
2989  * visible backing of an io_uring instance. The application mmaps this
2990  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
2991  * we have to tie this fd to a socket for file garbage collection purposes.
2992  */
2993 static int io_uring_get_fd(struct io_ring_ctx *ctx)
2994 {
2995         struct file *file;
2996         int ret;
2997
2998 #if defined(CONFIG_UNIX)
2999         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
3000                                 &ctx->ring_sock);
3001         if (ret)
3002                 return ret;
3003 #endif
3004
3005         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
3006         if (ret < 0)
3007                 goto err;
3008
3009         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
3010                                         O_RDWR | O_CLOEXEC);
3011         if (IS_ERR(file)) {
3012                 put_unused_fd(ret);
3013                 ret = PTR_ERR(file);
3014                 goto err;
3015         }
3016
3017 #if defined(CONFIG_UNIX)
3018         ctx->ring_sock->file = file;
3019         ctx->ring_sock->sk->sk_user_data = ctx;
3020 #endif
3021         fd_install(ret, file);
3022         return ret;
3023 err:
3024 #if defined(CONFIG_UNIX)
3025         sock_release(ctx->ring_sock);
3026         ctx->ring_sock = NULL;
3027 #endif
3028         return ret;
3029 }
3030
3031 static int io_uring_create(unsigned entries, struct io_uring_params *p)
3032 {
3033         struct user_struct *user = NULL;
3034         struct io_ring_ctx *ctx;
3035         bool account_mem;
3036         int ret;
3037
3038         if (!entries || entries > IORING_MAX_ENTRIES)
3039                 return -EINVAL;
3040
3041         /*
3042          * Use twice as many entries for the CQ ring. It's possible for the
3043          * application to drive a higher depth than the size of the SQ ring,
3044          * since the sqes are only used at submission time. This allows for
3045          * some flexibility in overcommitting a bit.
3046          */
3047         p->sq_entries = roundup_pow_of_two(entries);
3048         p->cq_entries = 2 * p->sq_entries;
3049
3050         user = get_uid(current_user());
3051         account_mem = !capable(CAP_IPC_LOCK);
3052
3053         if (account_mem) {
3054                 ret = io_account_mem(user,
3055                                 ring_pages(p->sq_entries, p->cq_entries));
3056                 if (ret) {
3057                         free_uid(user);
3058                         return ret;
3059                 }
3060         }
3061
3062         ctx = io_ring_ctx_alloc(p);
3063         if (!ctx) {
3064                 if (account_mem)
3065                         io_unaccount_mem(user, ring_pages(p->sq_entries,
3066                                                                 p->cq_entries));
3067                 free_uid(user);
3068                 return -ENOMEM;
3069         }
3070         ctx->compat = in_compat_syscall();
3071         ctx->account_mem = account_mem;
3072         ctx->user = user;
3073
3074         ret = io_allocate_scq_urings(ctx, p);
3075         if (ret)
3076                 goto err;
3077
3078         ret = io_sq_offload_start(ctx, p);
3079         if (ret)
3080                 goto err;
3081
3082         ret = io_uring_get_fd(ctx);
3083         if (ret < 0)
3084                 goto err;
3085
3086         memset(&p->sq_off, 0, sizeof(p->sq_off));
3087         p->sq_off.head = offsetof(struct io_sq_ring, r.head);
3088         p->sq_off.tail = offsetof(struct io_sq_ring, r.tail);
3089         p->sq_off.ring_mask = offsetof(struct io_sq_ring, ring_mask);
3090         p->sq_off.ring_entries = offsetof(struct io_sq_ring, ring_entries);
3091         p->sq_off.flags = offsetof(struct io_sq_ring, flags);
3092         p->sq_off.dropped = offsetof(struct io_sq_ring, dropped);
3093         p->sq_off.array = offsetof(struct io_sq_ring, array);
3094
3095         memset(&p->cq_off, 0, sizeof(p->cq_off));
3096         p->cq_off.head = offsetof(struct io_cq_ring, r.head);
3097         p->cq_off.tail = offsetof(struct io_cq_ring, r.tail);
3098         p->cq_off.ring_mask = offsetof(struct io_cq_ring, ring_mask);
3099         p->cq_off.ring_entries = offsetof(struct io_cq_ring, ring_entries);
3100         p->cq_off.overflow = offsetof(struct io_cq_ring, overflow);
3101         p->cq_off.cqes = offsetof(struct io_cq_ring, cqes);
3102         return ret;
3103 err:
3104         io_ring_ctx_wait_and_kill(ctx);
3105         return ret;
3106 }
3107
3108 /*
3109  * Sets up an aio uring context, and returns the fd. Applications asks for a
3110  * ring size, we return the actual sq/cq ring sizes (among other things) in the
3111  * params structure passed in.
3112  */
3113 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
3114 {
3115         struct io_uring_params p;
3116         long ret;
3117         int i;
3118
3119         if (copy_from_user(&p, params, sizeof(p)))
3120                 return -EFAULT;
3121         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
3122                 if (p.resv[i])
3123                         return -EINVAL;
3124         }
3125
3126         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
3127                         IORING_SETUP_SQ_AFF))
3128                 return -EINVAL;
3129
3130         ret = io_uring_create(entries, &p);
3131         if (ret < 0)
3132                 return ret;
3133
3134         if (copy_to_user(params, &p, sizeof(p)))
3135                 return -EFAULT;
3136
3137         return ret;
3138 }
3139
3140 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
3141                 struct io_uring_params __user *, params)
3142 {
3143         return io_uring_setup(entries, params);
3144 }
3145
3146 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
3147                                void __user *arg, unsigned nr_args)
3148         __releases(ctx->uring_lock)
3149         __acquires(ctx->uring_lock)
3150 {
3151         int ret;
3152
3153         /*
3154          * We're inside the ring mutex, if the ref is already dying, then
3155          * someone else killed the ctx or is already going through
3156          * io_uring_register().
3157          */
3158         if (percpu_ref_is_dying(&ctx->refs))
3159                 return -ENXIO;
3160
3161         percpu_ref_kill(&ctx->refs);
3162
3163         /*
3164          * Drop uring mutex before waiting for references to exit. If another
3165          * thread is currently inside io_uring_enter() it might need to grab
3166          * the uring_lock to make progress. If we hold it here across the drain
3167          * wait, then we can deadlock. It's safe to drop the mutex here, since
3168          * no new references will come in after we've killed the percpu ref.
3169          */
3170         mutex_unlock(&ctx->uring_lock);
3171         wait_for_completion(&ctx->ctx_done);
3172         mutex_lock(&ctx->uring_lock);
3173
3174         switch (opcode) {
3175         case IORING_REGISTER_BUFFERS:
3176                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
3177                 break;
3178         case IORING_UNREGISTER_BUFFERS:
3179                 ret = -EINVAL;
3180                 if (arg || nr_args)
3181                         break;
3182                 ret = io_sqe_buffer_unregister(ctx);
3183                 break;
3184         case IORING_REGISTER_FILES:
3185                 ret = io_sqe_files_register(ctx, arg, nr_args);
3186                 break;
3187         case IORING_UNREGISTER_FILES:
3188                 ret = -EINVAL;
3189                 if (arg || nr_args)
3190                         break;
3191                 ret = io_sqe_files_unregister(ctx);
3192                 break;
3193         case IORING_REGISTER_EVENTFD:
3194                 ret = -EINVAL;
3195                 if (nr_args != 1)
3196                         break;
3197                 ret = io_eventfd_register(ctx, arg);
3198                 break;
3199         case IORING_UNREGISTER_EVENTFD:
3200                 ret = -EINVAL;
3201                 if (arg || nr_args)
3202                         break;
3203                 ret = io_eventfd_unregister(ctx);
3204                 break;
3205         default:
3206                 ret = -EINVAL;
3207                 break;
3208         }
3209
3210         /* bring the ctx back to life */
3211         reinit_completion(&ctx->ctx_done);
3212         percpu_ref_reinit(&ctx->refs);
3213         return ret;
3214 }
3215
3216 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
3217                 void __user *, arg, unsigned int, nr_args)
3218 {
3219         struct io_ring_ctx *ctx;
3220         long ret = -EBADF;
3221         struct fd f;
3222
3223         f = fdget(fd);
3224         if (!f.file)
3225                 return -EBADF;
3226
3227         ret = -EOPNOTSUPP;
3228         if (f.file->f_op != &io_uring_fops)
3229                 goto out_fput;
3230
3231         ctx = f.file->private_data;
3232
3233         mutex_lock(&ctx->uring_lock);
3234         ret = __io_uring_register(ctx, opcode, arg, nr_args);
3235         mutex_unlock(&ctx->uring_lock);
3236 out_fput:
3237         fdput(f);
3238         return ret;
3239 }
3240
3241 static int __init io_uring_init(void)
3242 {
3243         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
3244         return 0;
3245 };
3246 __initcall(io_uring_init);