]> asedeno.scripts.mit.edu Git - linux.git/blob - fs/io_uring.c
io_uring: run next sqe inline if possible
[linux.git] / fs / io_uring.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Shared application/kernel submission and completion ring pairs, for
4  * supporting fast/efficient IO.
5  *
6  * A note on the read/write ordering memory barriers that are matched between
7  * the application and kernel side.
8  *
9  * After the application reads the CQ ring tail, it must use an
10  * appropriate smp_rmb() to pair with the smp_wmb() the kernel uses
11  * before writing the tail (using smp_load_acquire to read the tail will
12  * do). It also needs a smp_mb() before updating CQ head (ordering the
13  * entry load(s) with the head store), pairing with an implicit barrier
14  * through a control-dependency in io_get_cqring (smp_store_release to
15  * store head will do). Failure to do so could lead to reading invalid
16  * CQ entries.
17  *
18  * Likewise, the application must use an appropriate smp_wmb() before
19  * writing the SQ tail (ordering SQ entry stores with the tail store),
20  * which pairs with smp_load_acquire in io_get_sqring (smp_store_release
21  * to store the tail will do). And it needs a barrier ordering the SQ
22  * head load before writing new SQ entries (smp_load_acquire to read
23  * head will do).
24  *
25  * When using the SQ poll thread (IORING_SETUP_SQPOLL), the application
26  * needs to check the SQ flags for IORING_SQ_NEED_WAKEUP *after*
27  * updating the SQ tail; a full memory barrier smp_mb() is needed
28  * between.
29  *
30  * Also see the examples in the liburing library:
31  *
32  *      git://git.kernel.dk/liburing
33  *
34  * io_uring also uses READ/WRITE_ONCE() for _any_ store or load that happens
35  * from data shared between the kernel and application. This is done both
36  * for ordering purposes, but also to ensure that once a value is loaded from
37  * data that the application could potentially modify, it remains stable.
38  *
39  * Copyright (C) 2018-2019 Jens Axboe
40  * Copyright (c) 2018-2019 Christoph Hellwig
41  */
42 #include <linux/kernel.h>
43 #include <linux/init.h>
44 #include <linux/errno.h>
45 #include <linux/syscalls.h>
46 #include <linux/compat.h>
47 #include <linux/refcount.h>
48 #include <linux/uio.h>
49
50 #include <linux/sched/signal.h>
51 #include <linux/fs.h>
52 #include <linux/file.h>
53 #include <linux/fdtable.h>
54 #include <linux/mm.h>
55 #include <linux/mman.h>
56 #include <linux/mmu_context.h>
57 #include <linux/percpu.h>
58 #include <linux/slab.h>
59 #include <linux/kthread.h>
60 #include <linux/blkdev.h>
61 #include <linux/bvec.h>
62 #include <linux/net.h>
63 #include <net/sock.h>
64 #include <net/af_unix.h>
65 #include <net/scm.h>
66 #include <linux/anon_inodes.h>
67 #include <linux/sched/mm.h>
68 #include <linux/uaccess.h>
69 #include <linux/nospec.h>
70 #include <linux/sizes.h>
71 #include <linux/hugetlb.h>
72 #include <linux/highmem.h>
73
74 #define CREATE_TRACE_POINTS
75 #include <trace/events/io_uring.h>
76
77 #include <uapi/linux/io_uring.h>
78
79 #include "internal.h"
80 #include "io-wq.h"
81
82 #define IORING_MAX_ENTRIES      32768
83 #define IORING_MAX_CQ_ENTRIES   (2 * IORING_MAX_ENTRIES)
84
85 /*
86  * Shift of 9 is 512 entries, or exactly one page on 64-bit archs
87  */
88 #define IORING_FILE_TABLE_SHIFT 9
89 #define IORING_MAX_FILES_TABLE  (1U << IORING_FILE_TABLE_SHIFT)
90 #define IORING_FILE_TABLE_MASK  (IORING_MAX_FILES_TABLE - 1)
91 #define IORING_MAX_FIXED_FILES  (64 * IORING_MAX_FILES_TABLE)
92
93 struct io_uring {
94         u32 head ____cacheline_aligned_in_smp;
95         u32 tail ____cacheline_aligned_in_smp;
96 };
97
98 /*
99  * This data is shared with the application through the mmap at offsets
100  * IORING_OFF_SQ_RING and IORING_OFF_CQ_RING.
101  *
102  * The offsets to the member fields are published through struct
103  * io_sqring_offsets when calling io_uring_setup.
104  */
105 struct io_rings {
106         /*
107          * Head and tail offsets into the ring; the offsets need to be
108          * masked to get valid indices.
109          *
110          * The kernel controls head of the sq ring and the tail of the cq ring,
111          * and the application controls tail of the sq ring and the head of the
112          * cq ring.
113          */
114         struct io_uring         sq, cq;
115         /*
116          * Bitmasks to apply to head and tail offsets (constant, equals
117          * ring_entries - 1)
118          */
119         u32                     sq_ring_mask, cq_ring_mask;
120         /* Ring sizes (constant, power of 2) */
121         u32                     sq_ring_entries, cq_ring_entries;
122         /*
123          * Number of invalid entries dropped by the kernel due to
124          * invalid index stored in array
125          *
126          * Written by the kernel, shouldn't be modified by the
127          * application (i.e. get number of "new events" by comparing to
128          * cached value).
129          *
130          * After a new SQ head value was read by the application this
131          * counter includes all submissions that were dropped reaching
132          * the new SQ head (and possibly more).
133          */
134         u32                     sq_dropped;
135         /*
136          * Runtime flags
137          *
138          * Written by the kernel, shouldn't be modified by the
139          * application.
140          *
141          * The application needs a full memory barrier before checking
142          * for IORING_SQ_NEED_WAKEUP after updating the sq tail.
143          */
144         u32                     sq_flags;
145         /*
146          * Number of completion events lost because the queue was full;
147          * this should be avoided by the application by making sure
148          * there are not more requests pending than there is space in
149          * the completion queue.
150          *
151          * Written by the kernel, shouldn't be modified by the
152          * application (i.e. get number of "new events" by comparing to
153          * cached value).
154          *
155          * As completion events come in out of order this counter is not
156          * ordered with any other data.
157          */
158         u32                     cq_overflow;
159         /*
160          * Ring buffer of completion events.
161          *
162          * The kernel writes completion events fresh every time they are
163          * produced, so the application is allowed to modify pending
164          * entries.
165          */
166         struct io_uring_cqe     cqes[] ____cacheline_aligned_in_smp;
167 };
168
169 struct io_mapped_ubuf {
170         u64             ubuf;
171         size_t          len;
172         struct          bio_vec *bvec;
173         unsigned int    nr_bvecs;
174 };
175
176 struct fixed_file_table {
177         struct file             **files;
178 };
179
180 struct io_ring_ctx {
181         struct {
182                 struct percpu_ref       refs;
183         } ____cacheline_aligned_in_smp;
184
185         struct {
186                 unsigned int            flags;
187                 bool                    compat;
188                 bool                    account_mem;
189                 bool                    cq_overflow_flushed;
190                 bool                    drain_next;
191
192                 /*
193                  * Ring buffer of indices into array of io_uring_sqe, which is
194                  * mmapped by the application using the IORING_OFF_SQES offset.
195                  *
196                  * This indirection could e.g. be used to assign fixed
197                  * io_uring_sqe entries to operations and only submit them to
198                  * the queue when needed.
199                  *
200                  * The kernel modifies neither the indices array nor the entries
201                  * array.
202                  */
203                 u32                     *sq_array;
204                 unsigned                cached_sq_head;
205                 unsigned                sq_entries;
206                 unsigned                sq_mask;
207                 unsigned                sq_thread_idle;
208                 unsigned                cached_sq_dropped;
209                 atomic_t                cached_cq_overflow;
210                 struct io_uring_sqe     *sq_sqes;
211
212                 struct list_head        defer_list;
213                 struct list_head        timeout_list;
214                 struct list_head        cq_overflow_list;
215
216                 wait_queue_head_t       inflight_wait;
217         } ____cacheline_aligned_in_smp;
218
219         struct io_rings *rings;
220
221         /* IO offload */
222         struct io_wq            *io_wq;
223         struct task_struct      *sqo_thread;    /* if using sq thread polling */
224         struct mm_struct        *sqo_mm;
225         wait_queue_head_t       sqo_wait;
226
227         /*
228          * If used, fixed file set. Writers must ensure that ->refs is dead,
229          * readers must ensure that ->refs is alive as long as the file* is
230          * used. Only updated through io_uring_register(2).
231          */
232         struct fixed_file_table *file_table;
233         unsigned                nr_user_files;
234
235         /* if used, fixed mapped user buffers */
236         unsigned                nr_user_bufs;
237         struct io_mapped_ubuf   *user_bufs;
238
239         struct user_struct      *user;
240
241         const struct cred       *creds;
242
243         /* 0 is for ctx quiesce/reinit/free, 1 is for sqo_thread started */
244         struct completion       *completions;
245
246         /* if all else fails... */
247         struct io_kiocb         *fallback_req;
248
249 #if defined(CONFIG_UNIX)
250         struct socket           *ring_sock;
251 #endif
252
253         struct {
254                 unsigned                cached_cq_tail;
255                 unsigned                cq_entries;
256                 unsigned                cq_mask;
257                 atomic_t                cq_timeouts;
258                 struct wait_queue_head  cq_wait;
259                 struct fasync_struct    *cq_fasync;
260                 struct eventfd_ctx      *cq_ev_fd;
261         } ____cacheline_aligned_in_smp;
262
263         struct {
264                 struct mutex            uring_lock;
265                 wait_queue_head_t       wait;
266         } ____cacheline_aligned_in_smp;
267
268         struct {
269                 spinlock_t              completion_lock;
270                 bool                    poll_multi_file;
271                 /*
272                  * ->poll_list is protected by the ctx->uring_lock for
273                  * io_uring instances that don't use IORING_SETUP_SQPOLL.
274                  * For SQPOLL, only the single threaded io_sq_thread() will
275                  * manipulate the list, hence no extra locking is needed there.
276                  */
277                 struct list_head        poll_list;
278                 struct hlist_head       *cancel_hash;
279                 unsigned                cancel_hash_bits;
280
281                 spinlock_t              inflight_lock;
282                 struct list_head        inflight_list;
283         } ____cacheline_aligned_in_smp;
284 };
285
286 /*
287  * First field must be the file pointer in all the
288  * iocb unions! See also 'struct kiocb' in <linux/fs.h>
289  */
290 struct io_poll_iocb {
291         struct file                     *file;
292         struct wait_queue_head          *head;
293         __poll_t                        events;
294         bool                            done;
295         bool                            canceled;
296         struct wait_queue_entry         wait;
297 };
298
299 struct io_timeout_data {
300         struct io_kiocb                 *req;
301         struct hrtimer                  timer;
302         struct timespec64               ts;
303         enum hrtimer_mode               mode;
304         u32                             seq_offset;
305 };
306
307 struct io_async_connect {
308         struct sockaddr_storage         address;
309 };
310
311 struct io_async_msghdr {
312         struct iovec                    fast_iov[UIO_FASTIOV];
313         struct iovec                    *iov;
314         struct sockaddr __user          *uaddr;
315         struct msghdr                   msg;
316 };
317
318 struct io_async_rw {
319         struct iovec                    fast_iov[UIO_FASTIOV];
320         struct iovec                    *iov;
321         ssize_t                         nr_segs;
322         ssize_t                         size;
323 };
324
325 struct io_async_ctx {
326         struct io_uring_sqe             sqe;
327         union {
328                 struct io_async_rw      rw;
329                 struct io_async_msghdr  msg;
330                 struct io_async_connect connect;
331                 struct io_timeout_data  timeout;
332         };
333 };
334
335 /*
336  * NOTE! Each of the iocb union members has the file pointer
337  * as the first entry in their struct definition. So you can
338  * access the file pointer through any of the sub-structs,
339  * or directly as just 'ki_filp' in this struct.
340  */
341 struct io_kiocb {
342         union {
343                 struct file             *file;
344                 struct kiocb            rw;
345                 struct io_poll_iocb     poll;
346         };
347
348         const struct io_uring_sqe       *sqe;
349         struct io_async_ctx             *io;
350         struct file                     *ring_file;
351         int                             ring_fd;
352         bool                            has_user;
353         bool                            in_async;
354         bool                            needs_fixed_file;
355
356         struct io_ring_ctx      *ctx;
357         union {
358                 struct list_head        list;
359                 struct hlist_node       hash_node;
360         };
361         struct list_head        link_list;
362         unsigned int            flags;
363         refcount_t              refs;
364 #define REQ_F_NOWAIT            1       /* must not punt to workers */
365 #define REQ_F_IOPOLL_COMPLETED  2       /* polled IO has completed */
366 #define REQ_F_FIXED_FILE        4       /* ctx owns file */
367 #define REQ_F_LINK_NEXT         8       /* already grabbed next link */
368 #define REQ_F_IO_DRAIN          16      /* drain existing IO first */
369 #define REQ_F_IO_DRAINED        32      /* drain done */
370 #define REQ_F_LINK              64      /* linked sqes */
371 #define REQ_F_LINK_TIMEOUT      128     /* has linked timeout */
372 #define REQ_F_FAIL_LINK         256     /* fail rest of links */
373 #define REQ_F_DRAIN_LINK        512     /* link should be fully drained */
374 #define REQ_F_TIMEOUT           1024    /* timeout request */
375 #define REQ_F_ISREG             2048    /* regular file */
376 #define REQ_F_MUST_PUNT         4096    /* must be punted even for NONBLOCK */
377 #define REQ_F_TIMEOUT_NOSEQ     8192    /* no timeout sequence */
378 #define REQ_F_INFLIGHT          16384   /* on inflight list */
379 #define REQ_F_COMP_LOCKED       32768   /* completion under lock */
380 #define REQ_F_HARDLINK          65536   /* doesn't sever on completion < 0 */
381         u64                     user_data;
382         u32                     result;
383         u32                     sequence;
384
385         struct list_head        inflight_entry;
386
387         struct io_wq_work       work;
388 };
389
390 #define IO_PLUG_THRESHOLD               2
391 #define IO_IOPOLL_BATCH                 8
392
393 struct io_submit_state {
394         struct blk_plug         plug;
395
396         /*
397          * io_kiocb alloc cache
398          */
399         void                    *reqs[IO_IOPOLL_BATCH];
400         unsigned                int free_reqs;
401         unsigned                int cur_req;
402
403         /*
404          * File reference cache
405          */
406         struct file             *file;
407         unsigned int            fd;
408         unsigned int            has_refs;
409         unsigned int            used_refs;
410         unsigned int            ios_left;
411 };
412
413 static void io_wq_submit_work(struct io_wq_work **workptr);
414 static void io_cqring_fill_event(struct io_kiocb *req, long res);
415 static void __io_free_req(struct io_kiocb *req);
416 static void io_put_req(struct io_kiocb *req);
417 static void io_double_put_req(struct io_kiocb *req);
418 static void __io_double_put_req(struct io_kiocb *req);
419 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req);
420 static void io_queue_linked_timeout(struct io_kiocb *req);
421
422 static struct kmem_cache *req_cachep;
423
424 static const struct file_operations io_uring_fops;
425
426 struct sock *io_uring_get_socket(struct file *file)
427 {
428 #if defined(CONFIG_UNIX)
429         if (file->f_op == &io_uring_fops) {
430                 struct io_ring_ctx *ctx = file->private_data;
431
432                 return ctx->ring_sock->sk;
433         }
434 #endif
435         return NULL;
436 }
437 EXPORT_SYMBOL(io_uring_get_socket);
438
439 static void io_ring_ctx_ref_free(struct percpu_ref *ref)
440 {
441         struct io_ring_ctx *ctx = container_of(ref, struct io_ring_ctx, refs);
442
443         complete(&ctx->completions[0]);
444 }
445
446 static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
447 {
448         struct io_ring_ctx *ctx;
449         int hash_bits;
450
451         ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
452         if (!ctx)
453                 return NULL;
454
455         ctx->fallback_req = kmem_cache_alloc(req_cachep, GFP_KERNEL);
456         if (!ctx->fallback_req)
457                 goto err;
458
459         ctx->completions = kmalloc(2 * sizeof(struct completion), GFP_KERNEL);
460         if (!ctx->completions)
461                 goto err;
462
463         /*
464          * Use 5 bits less than the max cq entries, that should give us around
465          * 32 entries per hash list if totally full and uniformly spread.
466          */
467         hash_bits = ilog2(p->cq_entries);
468         hash_bits -= 5;
469         if (hash_bits <= 0)
470                 hash_bits = 1;
471         ctx->cancel_hash_bits = hash_bits;
472         ctx->cancel_hash = kmalloc((1U << hash_bits) * sizeof(struct hlist_head),
473                                         GFP_KERNEL);
474         if (!ctx->cancel_hash)
475                 goto err;
476         __hash_init(ctx->cancel_hash, 1U << hash_bits);
477
478         if (percpu_ref_init(&ctx->refs, io_ring_ctx_ref_free,
479                             PERCPU_REF_ALLOW_REINIT, GFP_KERNEL))
480                 goto err;
481
482         ctx->flags = p->flags;
483         init_waitqueue_head(&ctx->cq_wait);
484         INIT_LIST_HEAD(&ctx->cq_overflow_list);
485         init_completion(&ctx->completions[0]);
486         init_completion(&ctx->completions[1]);
487         mutex_init(&ctx->uring_lock);
488         init_waitqueue_head(&ctx->wait);
489         spin_lock_init(&ctx->completion_lock);
490         INIT_LIST_HEAD(&ctx->poll_list);
491         INIT_LIST_HEAD(&ctx->defer_list);
492         INIT_LIST_HEAD(&ctx->timeout_list);
493         init_waitqueue_head(&ctx->inflight_wait);
494         spin_lock_init(&ctx->inflight_lock);
495         INIT_LIST_HEAD(&ctx->inflight_list);
496         return ctx;
497 err:
498         if (ctx->fallback_req)
499                 kmem_cache_free(req_cachep, ctx->fallback_req);
500         kfree(ctx->completions);
501         kfree(ctx->cancel_hash);
502         kfree(ctx);
503         return NULL;
504 }
505
506 static inline bool __req_need_defer(struct io_kiocb *req)
507 {
508         struct io_ring_ctx *ctx = req->ctx;
509
510         return req->sequence != ctx->cached_cq_tail + ctx->cached_sq_dropped
511                                         + atomic_read(&ctx->cached_cq_overflow);
512 }
513
514 static inline bool req_need_defer(struct io_kiocb *req)
515 {
516         if ((req->flags & (REQ_F_IO_DRAIN|REQ_F_IO_DRAINED)) == REQ_F_IO_DRAIN)
517                 return __req_need_defer(req);
518
519         return false;
520 }
521
522 static struct io_kiocb *io_get_deferred_req(struct io_ring_ctx *ctx)
523 {
524         struct io_kiocb *req;
525
526         req = list_first_entry_or_null(&ctx->defer_list, struct io_kiocb, list);
527         if (req && !req_need_defer(req)) {
528                 list_del_init(&req->list);
529                 return req;
530         }
531
532         return NULL;
533 }
534
535 static struct io_kiocb *io_get_timeout_req(struct io_ring_ctx *ctx)
536 {
537         struct io_kiocb *req;
538
539         req = list_first_entry_or_null(&ctx->timeout_list, struct io_kiocb, list);
540         if (req) {
541                 if (req->flags & REQ_F_TIMEOUT_NOSEQ)
542                         return NULL;
543                 if (!__req_need_defer(req)) {
544                         list_del_init(&req->list);
545                         return req;
546                 }
547         }
548
549         return NULL;
550 }
551
552 static void __io_commit_cqring(struct io_ring_ctx *ctx)
553 {
554         struct io_rings *rings = ctx->rings;
555
556         if (ctx->cached_cq_tail != READ_ONCE(rings->cq.tail)) {
557                 /* order cqe stores with ring update */
558                 smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
559
560                 if (wq_has_sleeper(&ctx->cq_wait)) {
561                         wake_up_interruptible(&ctx->cq_wait);
562                         kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
563                 }
564         }
565 }
566
567 static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
568 {
569         u8 opcode = READ_ONCE(sqe->opcode);
570
571         return !(opcode == IORING_OP_READ_FIXED ||
572                  opcode == IORING_OP_WRITE_FIXED);
573 }
574
575 static inline bool io_prep_async_work(struct io_kiocb *req,
576                                       struct io_kiocb **link)
577 {
578         bool do_hashed = false;
579
580         if (req->sqe) {
581                 switch (req->sqe->opcode) {
582                 case IORING_OP_WRITEV:
583                 case IORING_OP_WRITE_FIXED:
584                         do_hashed = true;
585                         /* fall-through */
586                 case IORING_OP_READV:
587                 case IORING_OP_READ_FIXED:
588                 case IORING_OP_SENDMSG:
589                 case IORING_OP_RECVMSG:
590                 case IORING_OP_ACCEPT:
591                 case IORING_OP_POLL_ADD:
592                 case IORING_OP_CONNECT:
593                         /*
594                          * We know REQ_F_ISREG is not set on some of these
595                          * opcodes, but this enables us to keep the check in
596                          * just one place.
597                          */
598                         if (!(req->flags & REQ_F_ISREG))
599                                 req->work.flags |= IO_WQ_WORK_UNBOUND;
600                         break;
601                 }
602                 if (io_sqe_needs_user(req->sqe))
603                         req->work.flags |= IO_WQ_WORK_NEEDS_USER;
604         }
605
606         *link = io_prep_linked_timeout(req);
607         return do_hashed;
608 }
609
610 static inline void io_queue_async_work(struct io_kiocb *req)
611 {
612         struct io_ring_ctx *ctx = req->ctx;
613         struct io_kiocb *link;
614         bool do_hashed;
615
616         do_hashed = io_prep_async_work(req, &link);
617
618         trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
619                                         req->flags);
620         if (!do_hashed) {
621                 io_wq_enqueue(ctx->io_wq, &req->work);
622         } else {
623                 io_wq_enqueue_hashed(ctx->io_wq, &req->work,
624                                         file_inode(req->file));
625         }
626
627         if (link)
628                 io_queue_linked_timeout(link);
629 }
630
631 static void io_kill_timeout(struct io_kiocb *req)
632 {
633         int ret;
634
635         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
636         if (ret != -1) {
637                 atomic_inc(&req->ctx->cq_timeouts);
638                 list_del_init(&req->list);
639                 io_cqring_fill_event(req, 0);
640                 io_put_req(req);
641         }
642 }
643
644 static void io_kill_timeouts(struct io_ring_ctx *ctx)
645 {
646         struct io_kiocb *req, *tmp;
647
648         spin_lock_irq(&ctx->completion_lock);
649         list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
650                 io_kill_timeout(req);
651         spin_unlock_irq(&ctx->completion_lock);
652 }
653
654 static void io_commit_cqring(struct io_ring_ctx *ctx)
655 {
656         struct io_kiocb *req;
657
658         while ((req = io_get_timeout_req(ctx)) != NULL)
659                 io_kill_timeout(req);
660
661         __io_commit_cqring(ctx);
662
663         while ((req = io_get_deferred_req(ctx)) != NULL) {
664                 req->flags |= REQ_F_IO_DRAINED;
665                 io_queue_async_work(req);
666         }
667 }
668
669 static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
670 {
671         struct io_rings *rings = ctx->rings;
672         unsigned tail;
673
674         tail = ctx->cached_cq_tail;
675         /*
676          * writes to the cq entry need to come after reading head; the
677          * control dependency is enough as we're using WRITE_ONCE to
678          * fill the cq entry
679          */
680         if (tail - READ_ONCE(rings->cq.head) == rings->cq_ring_entries)
681                 return NULL;
682
683         ctx->cached_cq_tail++;
684         return &rings->cqes[tail & ctx->cq_mask];
685 }
686
687 static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
688 {
689         if (waitqueue_active(&ctx->wait))
690                 wake_up(&ctx->wait);
691         if (waitqueue_active(&ctx->sqo_wait))
692                 wake_up(&ctx->sqo_wait);
693         if (ctx->cq_ev_fd)
694                 eventfd_signal(ctx->cq_ev_fd, 1);
695 }
696
697 /* Returns true if there are no backlogged entries after the flush */
698 static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
699 {
700         struct io_rings *rings = ctx->rings;
701         struct io_uring_cqe *cqe;
702         struct io_kiocb *req;
703         unsigned long flags;
704         LIST_HEAD(list);
705
706         if (!force) {
707                 if (list_empty_careful(&ctx->cq_overflow_list))
708                         return true;
709                 if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
710                     rings->cq_ring_entries))
711                         return false;
712         }
713
714         spin_lock_irqsave(&ctx->completion_lock, flags);
715
716         /* if force is set, the ring is going away. always drop after that */
717         if (force)
718                 ctx->cq_overflow_flushed = true;
719
720         cqe = NULL;
721         while (!list_empty(&ctx->cq_overflow_list)) {
722                 cqe = io_get_cqring(ctx);
723                 if (!cqe && !force)
724                         break;
725
726                 req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
727                                                 list);
728                 list_move(&req->list, &list);
729                 if (cqe) {
730                         WRITE_ONCE(cqe->user_data, req->user_data);
731                         WRITE_ONCE(cqe->res, req->result);
732                         WRITE_ONCE(cqe->flags, 0);
733                 } else {
734                         WRITE_ONCE(ctx->rings->cq_overflow,
735                                 atomic_inc_return(&ctx->cached_cq_overflow));
736                 }
737         }
738
739         io_commit_cqring(ctx);
740         spin_unlock_irqrestore(&ctx->completion_lock, flags);
741         io_cqring_ev_posted(ctx);
742
743         while (!list_empty(&list)) {
744                 req = list_first_entry(&list, struct io_kiocb, list);
745                 list_del(&req->list);
746                 io_put_req(req);
747         }
748
749         return cqe != NULL;
750 }
751
752 static void io_cqring_fill_event(struct io_kiocb *req, long res)
753 {
754         struct io_ring_ctx *ctx = req->ctx;
755         struct io_uring_cqe *cqe;
756
757         trace_io_uring_complete(ctx, req->user_data, res);
758
759         /*
760          * If we can't get a cq entry, userspace overflowed the
761          * submission (by quite a lot). Increment the overflow count in
762          * the ring.
763          */
764         cqe = io_get_cqring(ctx);
765         if (likely(cqe)) {
766                 WRITE_ONCE(cqe->user_data, req->user_data);
767                 WRITE_ONCE(cqe->res, res);
768                 WRITE_ONCE(cqe->flags, 0);
769         } else if (ctx->cq_overflow_flushed) {
770                 WRITE_ONCE(ctx->rings->cq_overflow,
771                                 atomic_inc_return(&ctx->cached_cq_overflow));
772         } else {
773                 refcount_inc(&req->refs);
774                 req->result = res;
775                 list_add_tail(&req->list, &ctx->cq_overflow_list);
776         }
777 }
778
779 static void io_cqring_add_event(struct io_kiocb *req, long res)
780 {
781         struct io_ring_ctx *ctx = req->ctx;
782         unsigned long flags;
783
784         spin_lock_irqsave(&ctx->completion_lock, flags);
785         io_cqring_fill_event(req, res);
786         io_commit_cqring(ctx);
787         spin_unlock_irqrestore(&ctx->completion_lock, flags);
788
789         io_cqring_ev_posted(ctx);
790 }
791
792 static inline bool io_is_fallback_req(struct io_kiocb *req)
793 {
794         return req == (struct io_kiocb *)
795                         ((unsigned long) req->ctx->fallback_req & ~1UL);
796 }
797
798 static struct io_kiocb *io_get_fallback_req(struct io_ring_ctx *ctx)
799 {
800         struct io_kiocb *req;
801
802         req = ctx->fallback_req;
803         if (!test_and_set_bit_lock(0, (unsigned long *) ctx->fallback_req))
804                 return req;
805
806         return NULL;
807 }
808
809 static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
810                                    struct io_submit_state *state)
811 {
812         gfp_t gfp = GFP_KERNEL | __GFP_NOWARN;
813         struct io_kiocb *req;
814
815         if (!percpu_ref_tryget(&ctx->refs))
816                 return NULL;
817
818         if (!state) {
819                 req = kmem_cache_alloc(req_cachep, gfp);
820                 if (unlikely(!req))
821                         goto fallback;
822         } else if (!state->free_reqs) {
823                 size_t sz;
824                 int ret;
825
826                 sz = min_t(size_t, state->ios_left, ARRAY_SIZE(state->reqs));
827                 ret = kmem_cache_alloc_bulk(req_cachep, gfp, sz, state->reqs);
828
829                 /*
830                  * Bulk alloc is all-or-nothing. If we fail to get a batch,
831                  * retry single alloc to be on the safe side.
832                  */
833                 if (unlikely(ret <= 0)) {
834                         state->reqs[0] = kmem_cache_alloc(req_cachep, gfp);
835                         if (!state->reqs[0])
836                                 goto fallback;
837                         ret = 1;
838                 }
839                 state->free_reqs = ret - 1;
840                 state->cur_req = 1;
841                 req = state->reqs[0];
842         } else {
843                 req = state->reqs[state->cur_req];
844                 state->free_reqs--;
845                 state->cur_req++;
846         }
847
848 got_it:
849         req->io = NULL;
850         req->ring_file = NULL;
851         req->file = NULL;
852         req->ctx = ctx;
853         req->flags = 0;
854         /* one is dropped after submission, the other at completion */
855         refcount_set(&req->refs, 2);
856         req->result = 0;
857         INIT_IO_WORK(&req->work, io_wq_submit_work);
858         return req;
859 fallback:
860         req = io_get_fallback_req(ctx);
861         if (req)
862                 goto got_it;
863         percpu_ref_put(&ctx->refs);
864         return NULL;
865 }
866
867 static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
868 {
869         if (*nr) {
870                 kmem_cache_free_bulk(req_cachep, *nr, reqs);
871                 percpu_ref_put_many(&ctx->refs, *nr);
872                 *nr = 0;
873         }
874 }
875
876 static void __io_free_req(struct io_kiocb *req)
877 {
878         struct io_ring_ctx *ctx = req->ctx;
879
880         if (req->io)
881                 kfree(req->io);
882         if (req->file && !(req->flags & REQ_F_FIXED_FILE))
883                 fput(req->file);
884         if (req->flags & REQ_F_INFLIGHT) {
885                 unsigned long flags;
886
887                 spin_lock_irqsave(&ctx->inflight_lock, flags);
888                 list_del(&req->inflight_entry);
889                 if (waitqueue_active(&ctx->inflight_wait))
890                         wake_up(&ctx->inflight_wait);
891                 spin_unlock_irqrestore(&ctx->inflight_lock, flags);
892         }
893         percpu_ref_put(&ctx->refs);
894         if (likely(!io_is_fallback_req(req)))
895                 kmem_cache_free(req_cachep, req);
896         else
897                 clear_bit_unlock(0, (unsigned long *) ctx->fallback_req);
898 }
899
900 static bool io_link_cancel_timeout(struct io_kiocb *req)
901 {
902         struct io_ring_ctx *ctx = req->ctx;
903         int ret;
904
905         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
906         if (ret != -1) {
907                 io_cqring_fill_event(req, -ECANCELED);
908                 io_commit_cqring(ctx);
909                 req->flags &= ~REQ_F_LINK;
910                 io_put_req(req);
911                 return true;
912         }
913
914         return false;
915 }
916
917 static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
918 {
919         struct io_ring_ctx *ctx = req->ctx;
920         bool wake_ev = false;
921
922         /* Already got next link */
923         if (req->flags & REQ_F_LINK_NEXT)
924                 return;
925
926         /*
927          * The list should never be empty when we are called here. But could
928          * potentially happen if the chain is messed up, check to be on the
929          * safe side.
930          */
931         while (!list_empty(&req->link_list)) {
932                 struct io_kiocb *nxt = list_first_entry(&req->link_list,
933                                                 struct io_kiocb, link_list);
934
935                 if (unlikely((req->flags & REQ_F_LINK_TIMEOUT) &&
936                              (nxt->flags & REQ_F_TIMEOUT))) {
937                         list_del_init(&nxt->link_list);
938                         wake_ev |= io_link_cancel_timeout(nxt);
939                         req->flags &= ~REQ_F_LINK_TIMEOUT;
940                         continue;
941                 }
942
943                 list_del_init(&req->link_list);
944                 if (!list_empty(&nxt->link_list))
945                         nxt->flags |= REQ_F_LINK;
946                 *nxtptr = nxt;
947                 break;
948         }
949
950         req->flags |= REQ_F_LINK_NEXT;
951         if (wake_ev)
952                 io_cqring_ev_posted(ctx);
953 }
954
955 /*
956  * Called if REQ_F_LINK is set, and we fail the head request
957  */
958 static void io_fail_links(struct io_kiocb *req)
959 {
960         struct io_ring_ctx *ctx = req->ctx;
961         unsigned long flags;
962
963         spin_lock_irqsave(&ctx->completion_lock, flags);
964
965         while (!list_empty(&req->link_list)) {
966                 struct io_kiocb *link = list_first_entry(&req->link_list,
967                                                 struct io_kiocb, link_list);
968
969                 list_del_init(&link->link_list);
970                 trace_io_uring_fail_link(req, link);
971
972                 if ((req->flags & REQ_F_LINK_TIMEOUT) &&
973                     link->sqe->opcode == IORING_OP_LINK_TIMEOUT) {
974                         io_link_cancel_timeout(link);
975                 } else {
976                         io_cqring_fill_event(link, -ECANCELED);
977                         __io_double_put_req(link);
978                 }
979                 req->flags &= ~REQ_F_LINK_TIMEOUT;
980         }
981
982         io_commit_cqring(ctx);
983         spin_unlock_irqrestore(&ctx->completion_lock, flags);
984         io_cqring_ev_posted(ctx);
985 }
986
987 static void io_req_find_next(struct io_kiocb *req, struct io_kiocb **nxt)
988 {
989         if (likely(!(req->flags & REQ_F_LINK)))
990                 return;
991
992         /*
993          * If LINK is set, we have dependent requests in this chain. If we
994          * didn't fail this request, queue the first one up, moving any other
995          * dependencies to the next request. In case of failure, fail the rest
996          * of the chain.
997          */
998         if (req->flags & REQ_F_FAIL_LINK) {
999                 io_fail_links(req);
1000         } else if ((req->flags & (REQ_F_LINK_TIMEOUT | REQ_F_COMP_LOCKED)) ==
1001                         REQ_F_LINK_TIMEOUT) {
1002                 struct io_ring_ctx *ctx = req->ctx;
1003                 unsigned long flags;
1004
1005                 /*
1006                  * If this is a timeout link, we could be racing with the
1007                  * timeout timer. Grab the completion lock for this case to
1008                  * protect against that.
1009                  */
1010                 spin_lock_irqsave(&ctx->completion_lock, flags);
1011                 io_req_link_next(req, nxt);
1012                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
1013         } else {
1014                 io_req_link_next(req, nxt);
1015         }
1016 }
1017
1018 static void io_free_req(struct io_kiocb *req)
1019 {
1020         struct io_kiocb *nxt = NULL;
1021
1022         io_req_find_next(req, &nxt);
1023         __io_free_req(req);
1024
1025         if (nxt)
1026                 io_queue_async_work(nxt);
1027 }
1028
1029 /*
1030  * Drop reference to request, return next in chain (if there is one) if this
1031  * was the last reference to this request.
1032  */
1033 __attribute__((nonnull))
1034 static void io_put_req_find_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
1035 {
1036         io_req_find_next(req, nxtptr);
1037
1038         if (refcount_dec_and_test(&req->refs))
1039                 __io_free_req(req);
1040 }
1041
1042 static void io_put_req(struct io_kiocb *req)
1043 {
1044         if (refcount_dec_and_test(&req->refs))
1045                 io_free_req(req);
1046 }
1047
1048 /*
1049  * Must only be used if we don't need to care about links, usually from
1050  * within the completion handling itself.
1051  */
1052 static void __io_double_put_req(struct io_kiocb *req)
1053 {
1054         /* drop both submit and complete references */
1055         if (refcount_sub_and_test(2, &req->refs))
1056                 __io_free_req(req);
1057 }
1058
1059 static void io_double_put_req(struct io_kiocb *req)
1060 {
1061         /* drop both submit and complete references */
1062         if (refcount_sub_and_test(2, &req->refs))
1063                 io_free_req(req);
1064 }
1065
1066 static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
1067 {
1068         struct io_rings *rings = ctx->rings;
1069
1070         /*
1071          * noflush == true is from the waitqueue handler, just ensure we wake
1072          * up the task, and the next invocation will flush the entries. We
1073          * cannot safely to it from here.
1074          */
1075         if (noflush && !list_empty(&ctx->cq_overflow_list))
1076                 return -1U;
1077
1078         io_cqring_overflow_flush(ctx, false);
1079
1080         /* See comment at the top of this file */
1081         smp_rmb();
1082         return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
1083 }
1084
1085 static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
1086 {
1087         struct io_rings *rings = ctx->rings;
1088
1089         /* make sure SQ entry isn't read before tail */
1090         return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
1091 }
1092
1093 /*
1094  * Find and free completed poll iocbs
1095  */
1096 static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
1097                                struct list_head *done)
1098 {
1099         void *reqs[IO_IOPOLL_BATCH];
1100         struct io_kiocb *req;
1101         int to_free;
1102
1103         to_free = 0;
1104         while (!list_empty(done)) {
1105                 req = list_first_entry(done, struct io_kiocb, list);
1106                 list_del(&req->list);
1107
1108                 io_cqring_fill_event(req, req->result);
1109                 (*nr_events)++;
1110
1111                 if (refcount_dec_and_test(&req->refs)) {
1112                         /* If we're not using fixed files, we have to pair the
1113                          * completion part with the file put. Use regular
1114                          * completions for those, only batch free for fixed
1115                          * file and non-linked commands.
1116                          */
1117                         if (((req->flags & (REQ_F_FIXED_FILE|REQ_F_LINK)) ==
1118                             REQ_F_FIXED_FILE) && !io_is_fallback_req(req) &&
1119                             !req->io) {
1120                                 reqs[to_free++] = req;
1121                                 if (to_free == ARRAY_SIZE(reqs))
1122                                         io_free_req_many(ctx, reqs, &to_free);
1123                         } else {
1124                                 io_free_req(req);
1125                         }
1126                 }
1127         }
1128
1129         io_commit_cqring(ctx);
1130         io_free_req_many(ctx, reqs, &to_free);
1131 }
1132
1133 static int io_do_iopoll(struct io_ring_ctx *ctx, unsigned int *nr_events,
1134                         long min)
1135 {
1136         struct io_kiocb *req, *tmp;
1137         LIST_HEAD(done);
1138         bool spin;
1139         int ret;
1140
1141         /*
1142          * Only spin for completions if we don't have multiple devices hanging
1143          * off our complete list, and we're under the requested amount.
1144          */
1145         spin = !ctx->poll_multi_file && *nr_events < min;
1146
1147         ret = 0;
1148         list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
1149                 struct kiocb *kiocb = &req->rw;
1150
1151                 /*
1152                  * Move completed entries to our local list. If we find a
1153                  * request that requires polling, break out and complete
1154                  * the done list first, if we have entries there.
1155                  */
1156                 if (req->flags & REQ_F_IOPOLL_COMPLETED) {
1157                         list_move_tail(&req->list, &done);
1158                         continue;
1159                 }
1160                 if (!list_empty(&done))
1161                         break;
1162
1163                 ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
1164                 if (ret < 0)
1165                         break;
1166
1167                 if (ret && spin)
1168                         spin = false;
1169                 ret = 0;
1170         }
1171
1172         if (!list_empty(&done))
1173                 io_iopoll_complete(ctx, nr_events, &done);
1174
1175         return ret;
1176 }
1177
1178 /*
1179  * Poll for a mininum of 'min' events. Note that if min == 0 we consider that a
1180  * non-spinning poll check - we'll still enter the driver poll loop, but only
1181  * as a non-spinning completion check.
1182  */
1183 static int io_iopoll_getevents(struct io_ring_ctx *ctx, unsigned int *nr_events,
1184                                 long min)
1185 {
1186         while (!list_empty(&ctx->poll_list) && !need_resched()) {
1187                 int ret;
1188
1189                 ret = io_do_iopoll(ctx, nr_events, min);
1190                 if (ret < 0)
1191                         return ret;
1192                 if (!min || *nr_events >= min)
1193                         return 0;
1194         }
1195
1196         return 1;
1197 }
1198
1199 /*
1200  * We can't just wait for polled events to come to us, we have to actively
1201  * find and complete them.
1202  */
1203 static void io_iopoll_reap_events(struct io_ring_ctx *ctx)
1204 {
1205         if (!(ctx->flags & IORING_SETUP_IOPOLL))
1206                 return;
1207
1208         mutex_lock(&ctx->uring_lock);
1209         while (!list_empty(&ctx->poll_list)) {
1210                 unsigned int nr_events = 0;
1211
1212                 io_iopoll_getevents(ctx, &nr_events, 1);
1213
1214                 /*
1215                  * Ensure we allow local-to-the-cpu processing to take place,
1216                  * in this case we need to ensure that we reap all events.
1217                  */
1218                 cond_resched();
1219         }
1220         mutex_unlock(&ctx->uring_lock);
1221 }
1222
1223 static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1224                             long min)
1225 {
1226         int iters = 0, ret = 0;
1227
1228         do {
1229                 int tmin = 0;
1230
1231                 /*
1232                  * Don't enter poll loop if we already have events pending.
1233                  * If we do, we can potentially be spinning for commands that
1234                  * already triggered a CQE (eg in error).
1235                  */
1236                 if (io_cqring_events(ctx, false))
1237                         break;
1238
1239                 /*
1240                  * If a submit got punted to a workqueue, we can have the
1241                  * application entering polling for a command before it gets
1242                  * issued. That app will hold the uring_lock for the duration
1243                  * of the poll right here, so we need to take a breather every
1244                  * now and then to ensure that the issue has a chance to add
1245                  * the poll to the issued list. Otherwise we can spin here
1246                  * forever, while the workqueue is stuck trying to acquire the
1247                  * very same mutex.
1248                  */
1249                 if (!(++iters & 7)) {
1250                         mutex_unlock(&ctx->uring_lock);
1251                         mutex_lock(&ctx->uring_lock);
1252                 }
1253
1254                 if (*nr_events < min)
1255                         tmin = min - *nr_events;
1256
1257                 ret = io_iopoll_getevents(ctx, nr_events, tmin);
1258                 if (ret <= 0)
1259                         break;
1260                 ret = 0;
1261         } while (min && !*nr_events && !need_resched());
1262
1263         return ret;
1264 }
1265
1266 static int io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
1267                            long min)
1268 {
1269         int ret;
1270
1271         /*
1272          * We disallow the app entering submit/complete with polling, but we
1273          * still need to lock the ring to prevent racing with polled issue
1274          * that got punted to a workqueue.
1275          */
1276         mutex_lock(&ctx->uring_lock);
1277         ret = __io_iopoll_check(ctx, nr_events, min);
1278         mutex_unlock(&ctx->uring_lock);
1279         return ret;
1280 }
1281
1282 static void kiocb_end_write(struct io_kiocb *req)
1283 {
1284         /*
1285          * Tell lockdep we inherited freeze protection from submission
1286          * thread.
1287          */
1288         if (req->flags & REQ_F_ISREG) {
1289                 struct inode *inode = file_inode(req->file);
1290
1291                 __sb_writers_acquired(inode->i_sb, SB_FREEZE_WRITE);
1292         }
1293         file_end_write(req->file);
1294 }
1295
1296 static inline void req_set_fail_links(struct io_kiocb *req)
1297 {
1298         if ((req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) == REQ_F_LINK)
1299                 req->flags |= REQ_F_FAIL_LINK;
1300 }
1301
1302 static void io_complete_rw_common(struct kiocb *kiocb, long res)
1303 {
1304         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1305
1306         if (kiocb->ki_flags & IOCB_WRITE)
1307                 kiocb_end_write(req);
1308
1309         if (res != req->result)
1310                 req_set_fail_links(req);
1311         io_cqring_add_event(req, res);
1312 }
1313
1314 static void io_complete_rw(struct kiocb *kiocb, long res, long res2)
1315 {
1316         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1317
1318         io_complete_rw_common(kiocb, res);
1319         io_put_req(req);
1320 }
1321
1322 static struct io_kiocb *__io_complete_rw(struct kiocb *kiocb, long res)
1323 {
1324         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1325         struct io_kiocb *nxt = NULL;
1326
1327         io_complete_rw_common(kiocb, res);
1328         io_put_req_find_next(req, &nxt);
1329
1330         return nxt;
1331 }
1332
1333 static void io_complete_rw_iopoll(struct kiocb *kiocb, long res, long res2)
1334 {
1335         struct io_kiocb *req = container_of(kiocb, struct io_kiocb, rw);
1336
1337         if (kiocb->ki_flags & IOCB_WRITE)
1338                 kiocb_end_write(req);
1339
1340         if (res != req->result)
1341                 req_set_fail_links(req);
1342         req->result = res;
1343         if (res != -EAGAIN)
1344                 req->flags |= REQ_F_IOPOLL_COMPLETED;
1345 }
1346
1347 /*
1348  * After the iocb has been issued, it's safe to be found on the poll list.
1349  * Adding the kiocb to the list AFTER submission ensures that we don't
1350  * find it from a io_iopoll_getevents() thread before the issuer is done
1351  * accessing the kiocb cookie.
1352  */
1353 static void io_iopoll_req_issued(struct io_kiocb *req)
1354 {
1355         struct io_ring_ctx *ctx = req->ctx;
1356
1357         /*
1358          * Track whether we have multiple files in our lists. This will impact
1359          * how we do polling eventually, not spinning if we're on potentially
1360          * different devices.
1361          */
1362         if (list_empty(&ctx->poll_list)) {
1363                 ctx->poll_multi_file = false;
1364         } else if (!ctx->poll_multi_file) {
1365                 struct io_kiocb *list_req;
1366
1367                 list_req = list_first_entry(&ctx->poll_list, struct io_kiocb,
1368                                                 list);
1369                 if (list_req->rw.ki_filp != req->rw.ki_filp)
1370                         ctx->poll_multi_file = true;
1371         }
1372
1373         /*
1374          * For fast devices, IO may have already completed. If it has, add
1375          * it to the front so we find it first.
1376          */
1377         if (req->flags & REQ_F_IOPOLL_COMPLETED)
1378                 list_add(&req->list, &ctx->poll_list);
1379         else
1380                 list_add_tail(&req->list, &ctx->poll_list);
1381 }
1382
1383 static void io_file_put(struct io_submit_state *state)
1384 {
1385         if (state->file) {
1386                 int diff = state->has_refs - state->used_refs;
1387
1388                 if (diff)
1389                         fput_many(state->file, diff);
1390                 state->file = NULL;
1391         }
1392 }
1393
1394 /*
1395  * Get as many references to a file as we have IOs left in this submission,
1396  * assuming most submissions are for one file, or at least that each file
1397  * has more than one submission.
1398  */
1399 static struct file *io_file_get(struct io_submit_state *state, int fd)
1400 {
1401         if (!state)
1402                 return fget(fd);
1403
1404         if (state->file) {
1405                 if (state->fd == fd) {
1406                         state->used_refs++;
1407                         state->ios_left--;
1408                         return state->file;
1409                 }
1410                 io_file_put(state);
1411         }
1412         state->file = fget_many(fd, state->ios_left);
1413         if (!state->file)
1414                 return NULL;
1415
1416         state->fd = fd;
1417         state->has_refs = state->ios_left;
1418         state->used_refs = 1;
1419         state->ios_left--;
1420         return state->file;
1421 }
1422
1423 /*
1424  * If we tracked the file through the SCM inflight mechanism, we could support
1425  * any file. For now, just ensure that anything potentially problematic is done
1426  * inline.
1427  */
1428 static bool io_file_supports_async(struct file *file)
1429 {
1430         umode_t mode = file_inode(file)->i_mode;
1431
1432         if (S_ISBLK(mode) || S_ISCHR(mode))
1433                 return true;
1434         if (S_ISREG(mode) && file->f_op != &io_uring_fops)
1435                 return true;
1436
1437         return false;
1438 }
1439
1440 static int io_prep_rw(struct io_kiocb *req, bool force_nonblock)
1441 {
1442         const struct io_uring_sqe *sqe = req->sqe;
1443         struct io_ring_ctx *ctx = req->ctx;
1444         struct kiocb *kiocb = &req->rw;
1445         unsigned ioprio;
1446         int ret;
1447
1448         if (!req->file)
1449                 return -EBADF;
1450
1451         if (S_ISREG(file_inode(req->file)->i_mode))
1452                 req->flags |= REQ_F_ISREG;
1453
1454         kiocb->ki_pos = READ_ONCE(sqe->off);
1455         kiocb->ki_flags = iocb_flags(kiocb->ki_filp);
1456         kiocb->ki_hint = ki_hint_validate(file_write_hint(kiocb->ki_filp));
1457
1458         ioprio = READ_ONCE(sqe->ioprio);
1459         if (ioprio) {
1460                 ret = ioprio_check_cap(ioprio);
1461                 if (ret)
1462                         return ret;
1463
1464                 kiocb->ki_ioprio = ioprio;
1465         } else
1466                 kiocb->ki_ioprio = get_current_ioprio();
1467
1468         ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
1469         if (unlikely(ret))
1470                 return ret;
1471
1472         /* don't allow async punt if RWF_NOWAIT was requested */
1473         if ((kiocb->ki_flags & IOCB_NOWAIT) ||
1474             (req->file->f_flags & O_NONBLOCK))
1475                 req->flags |= REQ_F_NOWAIT;
1476
1477         if (force_nonblock)
1478                 kiocb->ki_flags |= IOCB_NOWAIT;
1479
1480         if (ctx->flags & IORING_SETUP_IOPOLL) {
1481                 if (!(kiocb->ki_flags & IOCB_DIRECT) ||
1482                     !kiocb->ki_filp->f_op->iopoll)
1483                         return -EOPNOTSUPP;
1484
1485                 kiocb->ki_flags |= IOCB_HIPRI;
1486                 kiocb->ki_complete = io_complete_rw_iopoll;
1487                 req->result = 0;
1488         } else {
1489                 if (kiocb->ki_flags & IOCB_HIPRI)
1490                         return -EINVAL;
1491                 kiocb->ki_complete = io_complete_rw;
1492         }
1493         return 0;
1494 }
1495
1496 static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
1497 {
1498         switch (ret) {
1499         case -EIOCBQUEUED:
1500                 break;
1501         case -ERESTARTSYS:
1502         case -ERESTARTNOINTR:
1503         case -ERESTARTNOHAND:
1504         case -ERESTART_RESTARTBLOCK:
1505                 /*
1506                  * We can't just restart the syscall, since previously
1507                  * submitted sqes may already be in progress. Just fail this
1508                  * IO with EINTR.
1509                  */
1510                 ret = -EINTR;
1511                 /* fall through */
1512         default:
1513                 kiocb->ki_complete(kiocb, ret, 0);
1514         }
1515 }
1516
1517 static void kiocb_done(struct kiocb *kiocb, ssize_t ret, struct io_kiocb **nxt,
1518                        bool in_async)
1519 {
1520         if (in_async && ret >= 0 && kiocb->ki_complete == io_complete_rw)
1521                 *nxt = __io_complete_rw(kiocb, ret);
1522         else
1523                 io_rw_done(kiocb, ret);
1524 }
1525
1526 static ssize_t io_import_fixed(struct io_ring_ctx *ctx, int rw,
1527                                const struct io_uring_sqe *sqe,
1528                                struct iov_iter *iter)
1529 {
1530         size_t len = READ_ONCE(sqe->len);
1531         struct io_mapped_ubuf *imu;
1532         unsigned index, buf_index;
1533         size_t offset;
1534         u64 buf_addr;
1535
1536         /* attempt to use fixed buffers without having provided iovecs */
1537         if (unlikely(!ctx->user_bufs))
1538                 return -EFAULT;
1539
1540         buf_index = READ_ONCE(sqe->buf_index);
1541         if (unlikely(buf_index >= ctx->nr_user_bufs))
1542                 return -EFAULT;
1543
1544         index = array_index_nospec(buf_index, ctx->nr_user_bufs);
1545         imu = &ctx->user_bufs[index];
1546         buf_addr = READ_ONCE(sqe->addr);
1547
1548         /* overflow */
1549         if (buf_addr + len < buf_addr)
1550                 return -EFAULT;
1551         /* not inside the mapped region */
1552         if (buf_addr < imu->ubuf || buf_addr + len > imu->ubuf + imu->len)
1553                 return -EFAULT;
1554
1555         /*
1556          * May not be a start of buffer, set size appropriately
1557          * and advance us to the beginning.
1558          */
1559         offset = buf_addr - imu->ubuf;
1560         iov_iter_bvec(iter, rw, imu->bvec, imu->nr_bvecs, offset + len);
1561
1562         if (offset) {
1563                 /*
1564                  * Don't use iov_iter_advance() here, as it's really slow for
1565                  * using the latter parts of a big fixed buffer - it iterates
1566                  * over each segment manually. We can cheat a bit here, because
1567                  * we know that:
1568                  *
1569                  * 1) it's a BVEC iter, we set it up
1570                  * 2) all bvecs are PAGE_SIZE in size, except potentially the
1571                  *    first and last bvec
1572                  *
1573                  * So just find our index, and adjust the iterator afterwards.
1574                  * If the offset is within the first bvec (or the whole first
1575                  * bvec, just use iov_iter_advance(). This makes it easier
1576                  * since we can just skip the first segment, which may not
1577                  * be PAGE_SIZE aligned.
1578                  */
1579                 const struct bio_vec *bvec = imu->bvec;
1580
1581                 if (offset <= bvec->bv_len) {
1582                         iov_iter_advance(iter, offset);
1583                 } else {
1584                         unsigned long seg_skip;
1585
1586                         /* skip first vec */
1587                         offset -= bvec->bv_len;
1588                         seg_skip = 1 + (offset >> PAGE_SHIFT);
1589
1590                         iter->bvec = bvec + seg_skip;
1591                         iter->nr_segs -= seg_skip;
1592                         iter->count -= bvec->bv_len + offset;
1593                         iter->iov_offset = offset & ~PAGE_MASK;
1594                 }
1595         }
1596
1597         return len;
1598 }
1599
1600 static ssize_t io_import_iovec(int rw, struct io_kiocb *req,
1601                                struct iovec **iovec, struct iov_iter *iter)
1602 {
1603         const struct io_uring_sqe *sqe = req->sqe;
1604         void __user *buf = u64_to_user_ptr(READ_ONCE(sqe->addr));
1605         size_t sqe_len = READ_ONCE(sqe->len);
1606         u8 opcode;
1607
1608         /*
1609          * We're reading ->opcode for the second time, but the first read
1610          * doesn't care whether it's _FIXED or not, so it doesn't matter
1611          * whether ->opcode changes concurrently. The first read does care
1612          * about whether it is a READ or a WRITE, so we don't trust this read
1613          * for that purpose and instead let the caller pass in the read/write
1614          * flag.
1615          */
1616         opcode = READ_ONCE(sqe->opcode);
1617         if (opcode == IORING_OP_READ_FIXED || opcode == IORING_OP_WRITE_FIXED) {
1618                 *iovec = NULL;
1619                 return io_import_fixed(req->ctx, rw, sqe, iter);
1620         }
1621
1622         if (req->io) {
1623                 struct io_async_rw *iorw = &req->io->rw;
1624
1625                 *iovec = iorw->iov;
1626                 iov_iter_init(iter, rw, *iovec, iorw->nr_segs, iorw->size);
1627                 if (iorw->iov == iorw->fast_iov)
1628                         *iovec = NULL;
1629                 return iorw->size;
1630         }
1631
1632         if (!req->has_user)
1633                 return -EFAULT;
1634
1635 #ifdef CONFIG_COMPAT
1636         if (req->ctx->compat)
1637                 return compat_import_iovec(rw, buf, sqe_len, UIO_FASTIOV,
1638                                                 iovec, iter);
1639 #endif
1640
1641         return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
1642 }
1643
1644 /*
1645  * For files that don't have ->read_iter() and ->write_iter(), handle them
1646  * by looping over ->read() or ->write() manually.
1647  */
1648 static ssize_t loop_rw_iter(int rw, struct file *file, struct kiocb *kiocb,
1649                            struct iov_iter *iter)
1650 {
1651         ssize_t ret = 0;
1652
1653         /*
1654          * Don't support polled IO through this interface, and we can't
1655          * support non-blocking either. For the latter, this just causes
1656          * the kiocb to be handled from an async context.
1657          */
1658         if (kiocb->ki_flags & IOCB_HIPRI)
1659                 return -EOPNOTSUPP;
1660         if (kiocb->ki_flags & IOCB_NOWAIT)
1661                 return -EAGAIN;
1662
1663         while (iov_iter_count(iter)) {
1664                 struct iovec iovec;
1665                 ssize_t nr;
1666
1667                 if (!iov_iter_is_bvec(iter)) {
1668                         iovec = iov_iter_iovec(iter);
1669                 } else {
1670                         /* fixed buffers import bvec */
1671                         iovec.iov_base = kmap(iter->bvec->bv_page)
1672                                                 + iter->iov_offset;
1673                         iovec.iov_len = min(iter->count,
1674                                         iter->bvec->bv_len - iter->iov_offset);
1675                 }
1676
1677                 if (rw == READ) {
1678                         nr = file->f_op->read(file, iovec.iov_base,
1679                                               iovec.iov_len, &kiocb->ki_pos);
1680                 } else {
1681                         nr = file->f_op->write(file, iovec.iov_base,
1682                                                iovec.iov_len, &kiocb->ki_pos);
1683                 }
1684
1685                 if (iov_iter_is_bvec(iter))
1686                         kunmap(iter->bvec->bv_page);
1687
1688                 if (nr < 0) {
1689                         if (!ret)
1690                                 ret = nr;
1691                         break;
1692                 }
1693                 ret += nr;
1694                 if (nr != iovec.iov_len)
1695                         break;
1696                 iov_iter_advance(iter, nr);
1697         }
1698
1699         return ret;
1700 }
1701
1702 static void io_req_map_io(struct io_kiocb *req, ssize_t io_size,
1703                           struct iovec *iovec, struct iovec *fast_iov,
1704                           struct iov_iter *iter)
1705 {
1706         req->io->rw.nr_segs = iter->nr_segs;
1707         req->io->rw.size = io_size;
1708         req->io->rw.iov = iovec;
1709         if (!req->io->rw.iov) {
1710                 req->io->rw.iov = req->io->rw.fast_iov;
1711                 memcpy(req->io->rw.iov, fast_iov,
1712                         sizeof(struct iovec) * iter->nr_segs);
1713         }
1714 }
1715
1716 static int io_setup_async_io(struct io_kiocb *req, ssize_t io_size,
1717                              struct iovec *iovec, struct iovec *fast_iov,
1718                              struct iov_iter *iter)
1719 {
1720         req->io = kmalloc(sizeof(*req->io), GFP_KERNEL);
1721         if (req->io) {
1722                 io_req_map_io(req, io_size, iovec, fast_iov, iter);
1723                 memcpy(&req->io->sqe, req->sqe, sizeof(req->io->sqe));
1724                 req->sqe = &req->io->sqe;
1725                 return 0;
1726         }
1727
1728         return -ENOMEM;
1729 }
1730
1731 static int io_read_prep(struct io_kiocb *req, struct iovec **iovec,
1732                         struct iov_iter *iter, bool force_nonblock)
1733 {
1734         ssize_t ret;
1735
1736         ret = io_prep_rw(req, force_nonblock);
1737         if (ret)
1738                 return ret;
1739
1740         if (unlikely(!(req->file->f_mode & FMODE_READ)))
1741                 return -EBADF;
1742
1743         return io_import_iovec(READ, req, iovec, iter);
1744 }
1745
1746 static int io_read(struct io_kiocb *req, struct io_kiocb **nxt,
1747                    bool force_nonblock)
1748 {
1749         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1750         struct kiocb *kiocb = &req->rw;
1751         struct iov_iter iter;
1752         struct file *file;
1753         size_t iov_count;
1754         ssize_t io_size, ret;
1755
1756         if (!req->io) {
1757                 ret = io_read_prep(req, &iovec, &iter, force_nonblock);
1758                 if (ret < 0)
1759                         return ret;
1760         } else {
1761                 ret = io_import_iovec(READ, req, &iovec, &iter);
1762                 if (ret < 0)
1763                         return ret;
1764         }
1765
1766         file = req->file;
1767         io_size = ret;
1768         if (req->flags & REQ_F_LINK)
1769                 req->result = io_size;
1770
1771         /*
1772          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1773          * we know to async punt it even if it was opened O_NONBLOCK
1774          */
1775         if (force_nonblock && !io_file_supports_async(file)) {
1776                 req->flags |= REQ_F_MUST_PUNT;
1777                 goto copy_iov;
1778         }
1779
1780         iov_count = iov_iter_count(&iter);
1781         ret = rw_verify_area(READ, file, &kiocb->ki_pos, iov_count);
1782         if (!ret) {
1783                 ssize_t ret2;
1784
1785                 if (file->f_op->read_iter)
1786                         ret2 = call_read_iter(file, kiocb, &iter);
1787                 else
1788                         ret2 = loop_rw_iter(READ, file, kiocb, &iter);
1789
1790                 /*
1791                  * In case of a short read, punt to async. This can happen
1792                  * if we have data partially cached. Alternatively we can
1793                  * return the short read, in which case the application will
1794                  * need to issue another SQE and wait for it. That SQE will
1795                  * need async punt anyway, so it's more efficient to do it
1796                  * here.
1797                  */
1798                 if (force_nonblock && !(req->flags & REQ_F_NOWAIT) &&
1799                     (req->flags & REQ_F_ISREG) &&
1800                     ret2 > 0 && ret2 < io_size)
1801                         ret2 = -EAGAIN;
1802                 /* Catch -EAGAIN return for forced non-blocking submission */
1803                 if (!force_nonblock || ret2 != -EAGAIN) {
1804                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1805                 } else {
1806 copy_iov:
1807                         ret = io_setup_async_io(req, io_size, iovec,
1808                                                 inline_vecs, &iter);
1809                         if (ret)
1810                                 goto out_free;
1811                         return -EAGAIN;
1812                 }
1813         }
1814 out_free:
1815         kfree(iovec);
1816         return ret;
1817 }
1818
1819 static int io_write_prep(struct io_kiocb *req, struct iovec **iovec,
1820                          struct iov_iter *iter, bool force_nonblock)
1821 {
1822         ssize_t ret;
1823
1824         ret = io_prep_rw(req, force_nonblock);
1825         if (ret)
1826                 return ret;
1827
1828         if (unlikely(!(req->file->f_mode & FMODE_WRITE)))
1829                 return -EBADF;
1830
1831         return io_import_iovec(WRITE, req, iovec, iter);
1832 }
1833
1834 static int io_write(struct io_kiocb *req, struct io_kiocb **nxt,
1835                     bool force_nonblock)
1836 {
1837         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
1838         struct kiocb *kiocb = &req->rw;
1839         struct iov_iter iter;
1840         struct file *file;
1841         size_t iov_count;
1842         ssize_t ret, io_size;
1843
1844         if (!req->io) {
1845                 ret = io_write_prep(req, &iovec, &iter, force_nonblock);
1846                 if (ret < 0)
1847                         return ret;
1848         } else {
1849                 ret = io_import_iovec(WRITE, req, &iovec, &iter);
1850                 if (ret < 0)
1851                         return ret;
1852         }
1853
1854         file = kiocb->ki_filp;
1855         io_size = ret;
1856         if (req->flags & REQ_F_LINK)
1857                 req->result = io_size;
1858
1859         /*
1860          * If the file doesn't support async, mark it as REQ_F_MUST_PUNT so
1861          * we know to async punt it even if it was opened O_NONBLOCK
1862          */
1863         if (force_nonblock && !io_file_supports_async(req->file)) {
1864                 req->flags |= REQ_F_MUST_PUNT;
1865                 goto copy_iov;
1866         }
1867
1868         if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
1869                 goto copy_iov;
1870
1871         iov_count = iov_iter_count(&iter);
1872         ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
1873         if (!ret) {
1874                 ssize_t ret2;
1875
1876                 /*
1877                  * Open-code file_start_write here to grab freeze protection,
1878                  * which will be released by another thread in
1879                  * io_complete_rw().  Fool lockdep by telling it the lock got
1880                  * released so that it doesn't complain about the held lock when
1881                  * we return to userspace.
1882                  */
1883                 if (req->flags & REQ_F_ISREG) {
1884                         __sb_start_write(file_inode(file)->i_sb,
1885                                                 SB_FREEZE_WRITE, true);
1886                         __sb_writers_release(file_inode(file)->i_sb,
1887                                                 SB_FREEZE_WRITE);
1888                 }
1889                 kiocb->ki_flags |= IOCB_WRITE;
1890
1891                 if (file->f_op->write_iter)
1892                         ret2 = call_write_iter(file, kiocb, &iter);
1893                 else
1894                         ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
1895                 if (!force_nonblock || ret2 != -EAGAIN) {
1896                         kiocb_done(kiocb, ret2, nxt, req->in_async);
1897                 } else {
1898 copy_iov:
1899                         ret = io_setup_async_io(req, io_size, iovec,
1900                                                 inline_vecs, &iter);
1901                         if (ret)
1902                                 goto out_free;
1903                         return -EAGAIN;
1904                 }
1905         }
1906 out_free:
1907         kfree(iovec);
1908         return ret;
1909 }
1910
1911 /*
1912  * IORING_OP_NOP just posts a completion event, nothing else.
1913  */
1914 static int io_nop(struct io_kiocb *req)
1915 {
1916         struct io_ring_ctx *ctx = req->ctx;
1917
1918         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1919                 return -EINVAL;
1920
1921         io_cqring_add_event(req, 0);
1922         io_put_req(req);
1923         return 0;
1924 }
1925
1926 static int io_prep_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1927 {
1928         struct io_ring_ctx *ctx = req->ctx;
1929
1930         if (!req->file)
1931                 return -EBADF;
1932
1933         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1934                 return -EINVAL;
1935         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1936                 return -EINVAL;
1937
1938         return 0;
1939 }
1940
1941 static int io_fsync(struct io_kiocb *req, const struct io_uring_sqe *sqe,
1942                     struct io_kiocb **nxt, bool force_nonblock)
1943 {
1944         loff_t sqe_off = READ_ONCE(sqe->off);
1945         loff_t sqe_len = READ_ONCE(sqe->len);
1946         loff_t end = sqe_off + sqe_len;
1947         unsigned fsync_flags;
1948         int ret;
1949
1950         fsync_flags = READ_ONCE(sqe->fsync_flags);
1951         if (unlikely(fsync_flags & ~IORING_FSYNC_DATASYNC))
1952                 return -EINVAL;
1953
1954         ret = io_prep_fsync(req, sqe);
1955         if (ret)
1956                 return ret;
1957
1958         /* fsync always requires a blocking context */
1959         if (force_nonblock)
1960                 return -EAGAIN;
1961
1962         ret = vfs_fsync_range(req->rw.ki_filp, sqe_off,
1963                                 end > 0 ? end : LLONG_MAX,
1964                                 fsync_flags & IORING_FSYNC_DATASYNC);
1965
1966         if (ret < 0)
1967                 req_set_fail_links(req);
1968         io_cqring_add_event(req, ret);
1969         io_put_req_find_next(req, nxt);
1970         return 0;
1971 }
1972
1973 static int io_prep_sfr(struct io_kiocb *req, const struct io_uring_sqe *sqe)
1974 {
1975         struct io_ring_ctx *ctx = req->ctx;
1976         int ret = 0;
1977
1978         if (!req->file)
1979                 return -EBADF;
1980
1981         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
1982                 return -EINVAL;
1983         if (unlikely(sqe->addr || sqe->ioprio || sqe->buf_index))
1984                 return -EINVAL;
1985
1986         return ret;
1987 }
1988
1989 static int io_sync_file_range(struct io_kiocb *req,
1990                               const struct io_uring_sqe *sqe,
1991                               struct io_kiocb **nxt,
1992                               bool force_nonblock)
1993 {
1994         loff_t sqe_off;
1995         loff_t sqe_len;
1996         unsigned flags;
1997         int ret;
1998
1999         ret = io_prep_sfr(req, sqe);
2000         if (ret)
2001                 return ret;
2002
2003         /* sync_file_range always requires a blocking context */
2004         if (force_nonblock)
2005                 return -EAGAIN;
2006
2007         sqe_off = READ_ONCE(sqe->off);
2008         sqe_len = READ_ONCE(sqe->len);
2009         flags = READ_ONCE(sqe->sync_range_flags);
2010
2011         ret = sync_file_range(req->rw.ki_filp, sqe_off, sqe_len, flags);
2012
2013         if (ret < 0)
2014                 req_set_fail_links(req);
2015         io_cqring_add_event(req, ret);
2016         io_put_req_find_next(req, nxt);
2017         return 0;
2018 }
2019
2020 static int io_sendmsg_prep(struct io_kiocb *req, struct io_async_ctx *io)
2021 {
2022 #if defined(CONFIG_NET)
2023         const struct io_uring_sqe *sqe = req->sqe;
2024         struct user_msghdr __user *msg;
2025         unsigned flags;
2026
2027         flags = READ_ONCE(sqe->msg_flags);
2028         msg = (struct user_msghdr __user *)(unsigned long) READ_ONCE(sqe->addr);
2029         io->msg.iov = io->msg.fast_iov;
2030         return sendmsg_copy_msghdr(&io->msg.msg, msg, flags, &io->msg.iov);
2031 #else
2032         return 0;
2033 #endif
2034 }
2035
2036 static int io_sendmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2037                       struct io_kiocb **nxt, bool force_nonblock)
2038 {
2039 #if defined(CONFIG_NET)
2040         struct socket *sock;
2041         int ret;
2042
2043         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2044                 return -EINVAL;
2045
2046         sock = sock_from_file(req->file, &ret);
2047         if (sock) {
2048                 struct io_async_ctx io, *copy;
2049                 struct sockaddr_storage addr;
2050                 struct msghdr *kmsg;
2051                 unsigned flags;
2052
2053                 flags = READ_ONCE(sqe->msg_flags);
2054                 if (flags & MSG_DONTWAIT)
2055                         req->flags |= REQ_F_NOWAIT;
2056                 else if (force_nonblock)
2057                         flags |= MSG_DONTWAIT;
2058
2059                 if (req->io) {
2060                         kmsg = &req->io->msg.msg;
2061                         kmsg->msg_name = &addr;
2062                 } else {
2063                         kmsg = &io.msg.msg;
2064                         kmsg->msg_name = &addr;
2065                         ret = io_sendmsg_prep(req, &io);
2066                         if (ret)
2067                                 goto out;
2068                 }
2069
2070                 ret = __sys_sendmsg_sock(sock, kmsg, flags);
2071                 if (force_nonblock && ret == -EAGAIN) {
2072                         copy = kmalloc(sizeof(*copy), GFP_KERNEL);
2073                         if (!copy) {
2074                                 ret = -ENOMEM;
2075                                 goto out;
2076                         }
2077                         memcpy(&copy->msg, &io.msg, sizeof(copy->msg));
2078                         req->io = copy;
2079                         memcpy(&req->io->sqe, req->sqe, sizeof(*req->sqe));
2080                         req->sqe = &req->io->sqe;
2081                         return ret;
2082                 }
2083                 if (ret == -ERESTARTSYS)
2084                         ret = -EINTR;
2085         }
2086
2087 out:
2088         io_cqring_add_event(req, ret);
2089         if (ret < 0)
2090                 req_set_fail_links(req);
2091         io_put_req_find_next(req, nxt);
2092         return 0;
2093 #else
2094         return -EOPNOTSUPP;
2095 #endif
2096 }
2097
2098 static int io_recvmsg_prep(struct io_kiocb *req, struct io_async_ctx *io)
2099 {
2100 #if defined(CONFIG_NET)
2101         const struct io_uring_sqe *sqe = req->sqe;
2102         struct user_msghdr __user *msg;
2103         unsigned flags;
2104
2105         flags = READ_ONCE(sqe->msg_flags);
2106         msg = (struct user_msghdr __user *)(unsigned long) READ_ONCE(sqe->addr);
2107         io->msg.iov = io->msg.fast_iov;
2108         return recvmsg_copy_msghdr(&io->msg.msg, msg, flags, &io->msg.uaddr,
2109                                         &io->msg.iov);
2110 #else
2111         return 0;
2112 #endif
2113 }
2114
2115 static int io_recvmsg(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2116                       struct io_kiocb **nxt, bool force_nonblock)
2117 {
2118 #if defined(CONFIG_NET)
2119         struct socket *sock;
2120         int ret;
2121
2122         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2123                 return -EINVAL;
2124
2125         sock = sock_from_file(req->file, &ret);
2126         if (sock) {
2127                 struct user_msghdr __user *msg;
2128                 struct io_async_ctx io, *copy;
2129                 struct sockaddr_storage addr;
2130                 struct msghdr *kmsg;
2131                 unsigned flags;
2132
2133                 flags = READ_ONCE(sqe->msg_flags);
2134                 if (flags & MSG_DONTWAIT)
2135                         req->flags |= REQ_F_NOWAIT;
2136                 else if (force_nonblock)
2137                         flags |= MSG_DONTWAIT;
2138
2139                 msg = (struct user_msghdr __user *) (unsigned long)
2140                         READ_ONCE(sqe->addr);
2141                 if (req->io) {
2142                         kmsg = &req->io->msg.msg;
2143                         kmsg->msg_name = &addr;
2144                 } else {
2145                         kmsg = &io.msg.msg;
2146                         kmsg->msg_name = &addr;
2147                         ret = io_recvmsg_prep(req, &io);
2148                         if (ret)
2149                                 goto out;
2150                 }
2151
2152                 ret = __sys_recvmsg_sock(sock, kmsg, msg, io.msg.uaddr, flags);
2153                 if (force_nonblock && ret == -EAGAIN) {
2154                         copy = kmalloc(sizeof(*copy), GFP_KERNEL);
2155                         if (!copy) {
2156                                 ret = -ENOMEM;
2157                                 goto out;
2158                         }
2159                         memcpy(copy, &io, sizeof(*copy));
2160                         req->io = copy;
2161                         memcpy(&req->io->sqe, req->sqe, sizeof(*req->sqe));
2162                         req->sqe = &req->io->sqe;
2163                         return ret;
2164                 }
2165                 if (ret == -ERESTARTSYS)
2166                         ret = -EINTR;
2167         }
2168
2169 out:
2170         io_cqring_add_event(req, ret);
2171         if (ret < 0)
2172                 req_set_fail_links(req);
2173         io_put_req_find_next(req, nxt);
2174         return 0;
2175 #else
2176         return -EOPNOTSUPP;
2177 #endif
2178 }
2179
2180 static int io_accept(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2181                      struct io_kiocb **nxt, bool force_nonblock)
2182 {
2183 #if defined(CONFIG_NET)
2184         struct sockaddr __user *addr;
2185         int __user *addr_len;
2186         unsigned file_flags;
2187         int flags, ret;
2188
2189         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2190                 return -EINVAL;
2191         if (sqe->ioprio || sqe->len || sqe->buf_index)
2192                 return -EINVAL;
2193
2194         addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr);
2195         addr_len = (int __user *) (unsigned long) READ_ONCE(sqe->addr2);
2196         flags = READ_ONCE(sqe->accept_flags);
2197         file_flags = force_nonblock ? O_NONBLOCK : 0;
2198
2199         ret = __sys_accept4_file(req->file, file_flags, addr, addr_len, flags);
2200         if (ret == -EAGAIN && force_nonblock) {
2201                 req->work.flags |= IO_WQ_WORK_NEEDS_FILES;
2202                 return -EAGAIN;
2203         }
2204         if (ret == -ERESTARTSYS)
2205                 ret = -EINTR;
2206         if (ret < 0)
2207                 req_set_fail_links(req);
2208         io_cqring_add_event(req, ret);
2209         io_put_req_find_next(req, nxt);
2210         return 0;
2211 #else
2212         return -EOPNOTSUPP;
2213 #endif
2214 }
2215
2216 static int io_connect_prep(struct io_kiocb *req, struct io_async_ctx *io)
2217 {
2218 #if defined(CONFIG_NET)
2219         const struct io_uring_sqe *sqe = req->sqe;
2220         struct sockaddr __user *addr;
2221         int addr_len;
2222
2223         addr = (struct sockaddr __user *) (unsigned long) READ_ONCE(sqe->addr);
2224         addr_len = READ_ONCE(sqe->addr2);
2225         return move_addr_to_kernel(addr, addr_len, &io->connect.address);
2226 #else
2227         return 0;
2228 #endif
2229 }
2230
2231 static int io_connect(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2232                       struct io_kiocb **nxt, bool force_nonblock)
2233 {
2234 #if defined(CONFIG_NET)
2235         struct io_async_ctx __io, *io;
2236         unsigned file_flags;
2237         int addr_len, ret;
2238
2239         if (unlikely(req->ctx->flags & (IORING_SETUP_IOPOLL|IORING_SETUP_SQPOLL)))
2240                 return -EINVAL;
2241         if (sqe->ioprio || sqe->len || sqe->buf_index || sqe->rw_flags)
2242                 return -EINVAL;
2243
2244         addr_len = READ_ONCE(sqe->addr2);
2245         file_flags = force_nonblock ? O_NONBLOCK : 0;
2246
2247         if (req->io) {
2248                 io = req->io;
2249         } else {
2250                 ret = io_connect_prep(req, &__io);
2251                 if (ret)
2252                         goto out;
2253                 io = &__io;
2254         }
2255
2256         ret = __sys_connect_file(req->file, &io->connect.address, addr_len,
2257                                         file_flags);
2258         if ((ret == -EAGAIN || ret == -EINPROGRESS) && force_nonblock) {
2259                 io = kmalloc(sizeof(*io), GFP_KERNEL);
2260                 if (!io) {
2261                         ret = -ENOMEM;
2262                         goto out;
2263                 }
2264                 memcpy(&io->connect, &__io.connect, sizeof(io->connect));
2265                 req->io = io;
2266                 memcpy(&io->sqe, req->sqe, sizeof(*req->sqe));
2267                 req->sqe = &io->sqe;
2268                 return -EAGAIN;
2269         }
2270         if (ret == -ERESTARTSYS)
2271                 ret = -EINTR;
2272 out:
2273         if (ret < 0)
2274                 req_set_fail_links(req);
2275         io_cqring_add_event(req, ret);
2276         io_put_req_find_next(req, nxt);
2277         return 0;
2278 #else
2279         return -EOPNOTSUPP;
2280 #endif
2281 }
2282
2283 static void io_poll_remove_one(struct io_kiocb *req)
2284 {
2285         struct io_poll_iocb *poll = &req->poll;
2286
2287         spin_lock(&poll->head->lock);
2288         WRITE_ONCE(poll->canceled, true);
2289         if (!list_empty(&poll->wait.entry)) {
2290                 list_del_init(&poll->wait.entry);
2291                 io_queue_async_work(req);
2292         }
2293         spin_unlock(&poll->head->lock);
2294         hash_del(&req->hash_node);
2295 }
2296
2297 static void io_poll_remove_all(struct io_ring_ctx *ctx)
2298 {
2299         struct hlist_node *tmp;
2300         struct io_kiocb *req;
2301         int i;
2302
2303         spin_lock_irq(&ctx->completion_lock);
2304         for (i = 0; i < (1U << ctx->cancel_hash_bits); i++) {
2305                 struct hlist_head *list;
2306
2307                 list = &ctx->cancel_hash[i];
2308                 hlist_for_each_entry_safe(req, tmp, list, hash_node)
2309                         io_poll_remove_one(req);
2310         }
2311         spin_unlock_irq(&ctx->completion_lock);
2312 }
2313
2314 static int io_poll_cancel(struct io_ring_ctx *ctx, __u64 sqe_addr)
2315 {
2316         struct hlist_head *list;
2317         struct io_kiocb *req;
2318
2319         list = &ctx->cancel_hash[hash_long(sqe_addr, ctx->cancel_hash_bits)];
2320         hlist_for_each_entry(req, list, hash_node) {
2321                 if (sqe_addr == req->user_data) {
2322                         io_poll_remove_one(req);
2323                         return 0;
2324                 }
2325         }
2326
2327         return -ENOENT;
2328 }
2329
2330 /*
2331  * Find a running poll command that matches one specified in sqe->addr,
2332  * and remove it if found.
2333  */
2334 static int io_poll_remove(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2335 {
2336         struct io_ring_ctx *ctx = req->ctx;
2337         int ret;
2338
2339         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2340                 return -EINVAL;
2341         if (sqe->ioprio || sqe->off || sqe->len || sqe->buf_index ||
2342             sqe->poll_events)
2343                 return -EINVAL;
2344
2345         spin_lock_irq(&ctx->completion_lock);
2346         ret = io_poll_cancel(ctx, READ_ONCE(sqe->addr));
2347         spin_unlock_irq(&ctx->completion_lock);
2348
2349         io_cqring_add_event(req, ret);
2350         if (ret < 0)
2351                 req_set_fail_links(req);
2352         io_put_req(req);
2353         return 0;
2354 }
2355
2356 static void io_poll_complete(struct io_kiocb *req, __poll_t mask, int error)
2357 {
2358         struct io_ring_ctx *ctx = req->ctx;
2359
2360         req->poll.done = true;
2361         if (error)
2362                 io_cqring_fill_event(req, error);
2363         else
2364                 io_cqring_fill_event(req, mangle_poll(mask));
2365         io_commit_cqring(ctx);
2366 }
2367
2368 static void io_poll_complete_work(struct io_wq_work **workptr)
2369 {
2370         struct io_wq_work *work = *workptr;
2371         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2372         struct io_poll_iocb *poll = &req->poll;
2373         struct poll_table_struct pt = { ._key = poll->events };
2374         struct io_ring_ctx *ctx = req->ctx;
2375         struct io_kiocb *nxt = NULL;
2376         __poll_t mask = 0;
2377         int ret = 0;
2378
2379         if (work->flags & IO_WQ_WORK_CANCEL) {
2380                 WRITE_ONCE(poll->canceled, true);
2381                 ret = -ECANCELED;
2382         } else if (READ_ONCE(poll->canceled)) {
2383                 ret = -ECANCELED;
2384         }
2385
2386         if (ret != -ECANCELED)
2387                 mask = vfs_poll(poll->file, &pt) & poll->events;
2388
2389         /*
2390          * Note that ->ki_cancel callers also delete iocb from active_reqs after
2391          * calling ->ki_cancel.  We need the ctx_lock roundtrip here to
2392          * synchronize with them.  In the cancellation case the list_del_init
2393          * itself is not actually needed, but harmless so we keep it in to
2394          * avoid further branches in the fast path.
2395          */
2396         spin_lock_irq(&ctx->completion_lock);
2397         if (!mask && ret != -ECANCELED) {
2398                 add_wait_queue(poll->head, &poll->wait);
2399                 spin_unlock_irq(&ctx->completion_lock);
2400                 return;
2401         }
2402         hash_del(&req->hash_node);
2403         io_poll_complete(req, mask, ret);
2404         spin_unlock_irq(&ctx->completion_lock);
2405
2406         io_cqring_ev_posted(ctx);
2407
2408         if (ret < 0)
2409                 req_set_fail_links(req);
2410         io_put_req_find_next(req, &nxt);
2411         if (nxt)
2412                 *workptr = &nxt->work;
2413 }
2414
2415 static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
2416                         void *key)
2417 {
2418         struct io_poll_iocb *poll = wait->private;
2419         struct io_kiocb *req = container_of(poll, struct io_kiocb, poll);
2420         struct io_ring_ctx *ctx = req->ctx;
2421         __poll_t mask = key_to_poll(key);
2422         unsigned long flags;
2423
2424         /* for instances that support it check for an event match first: */
2425         if (mask && !(mask & poll->events))
2426                 return 0;
2427
2428         list_del_init(&poll->wait.entry);
2429
2430         /*
2431          * Run completion inline if we can. We're using trylock here because
2432          * we are violating the completion_lock -> poll wq lock ordering.
2433          * If we have a link timeout we're going to need the completion_lock
2434          * for finalizing the request, mark us as having grabbed that already.
2435          */
2436         if (mask && spin_trylock_irqsave(&ctx->completion_lock, flags)) {
2437                 hash_del(&req->hash_node);
2438                 io_poll_complete(req, mask, 0);
2439                 req->flags |= REQ_F_COMP_LOCKED;
2440                 io_put_req(req);
2441                 spin_unlock_irqrestore(&ctx->completion_lock, flags);
2442
2443                 io_cqring_ev_posted(ctx);
2444         } else {
2445                 io_queue_async_work(req);
2446         }
2447
2448         return 1;
2449 }
2450
2451 struct io_poll_table {
2452         struct poll_table_struct pt;
2453         struct io_kiocb *req;
2454         int error;
2455 };
2456
2457 static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
2458                                struct poll_table_struct *p)
2459 {
2460         struct io_poll_table *pt = container_of(p, struct io_poll_table, pt);
2461
2462         if (unlikely(pt->req->poll.head)) {
2463                 pt->error = -EINVAL;
2464                 return;
2465         }
2466
2467         pt->error = 0;
2468         pt->req->poll.head = head;
2469         add_wait_queue(head, &pt->req->poll.wait);
2470 }
2471
2472 static void io_poll_req_insert(struct io_kiocb *req)
2473 {
2474         struct io_ring_ctx *ctx = req->ctx;
2475         struct hlist_head *list;
2476
2477         list = &ctx->cancel_hash[hash_long(req->user_data, ctx->cancel_hash_bits)];
2478         hlist_add_head(&req->hash_node, list);
2479 }
2480
2481 static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2482                        struct io_kiocb **nxt)
2483 {
2484         struct io_poll_iocb *poll = &req->poll;
2485         struct io_ring_ctx *ctx = req->ctx;
2486         struct io_poll_table ipt;
2487         bool cancel = false;
2488         __poll_t mask;
2489         u16 events;
2490
2491         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2492                 return -EINVAL;
2493         if (sqe->addr || sqe->ioprio || sqe->off || sqe->len || sqe->buf_index)
2494                 return -EINVAL;
2495         if (!poll->file)
2496                 return -EBADF;
2497
2498         req->io = NULL;
2499         INIT_IO_WORK(&req->work, io_poll_complete_work);
2500         events = READ_ONCE(sqe->poll_events);
2501         poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
2502         INIT_HLIST_NODE(&req->hash_node);
2503
2504         poll->head = NULL;
2505         poll->done = false;
2506         poll->canceled = false;
2507
2508         ipt.pt._qproc = io_poll_queue_proc;
2509         ipt.pt._key = poll->events;
2510         ipt.req = req;
2511         ipt.error = -EINVAL; /* same as no support for IOCB_CMD_POLL */
2512
2513         /* initialized the list so that we can do list_empty checks */
2514         INIT_LIST_HEAD(&poll->wait.entry);
2515         init_waitqueue_func_entry(&poll->wait, io_poll_wake);
2516         poll->wait.private = poll;
2517
2518         INIT_LIST_HEAD(&req->list);
2519
2520         mask = vfs_poll(poll->file, &ipt.pt) & poll->events;
2521
2522         spin_lock_irq(&ctx->completion_lock);
2523         if (likely(poll->head)) {
2524                 spin_lock(&poll->head->lock);
2525                 if (unlikely(list_empty(&poll->wait.entry))) {
2526                         if (ipt.error)
2527                                 cancel = true;
2528                         ipt.error = 0;
2529                         mask = 0;
2530                 }
2531                 if (mask || ipt.error)
2532                         list_del_init(&poll->wait.entry);
2533                 else if (cancel)
2534                         WRITE_ONCE(poll->canceled, true);
2535                 else if (!poll->done) /* actually waiting for an event */
2536                         io_poll_req_insert(req);
2537                 spin_unlock(&poll->head->lock);
2538         }
2539         if (mask) { /* no async, we'd stolen it */
2540                 ipt.error = 0;
2541                 io_poll_complete(req, mask, 0);
2542         }
2543         spin_unlock_irq(&ctx->completion_lock);
2544
2545         if (mask) {
2546                 io_cqring_ev_posted(ctx);
2547                 io_put_req_find_next(req, nxt);
2548         }
2549         return ipt.error;
2550 }
2551
2552 static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
2553 {
2554         struct io_timeout_data *data = container_of(timer,
2555                                                 struct io_timeout_data, timer);
2556         struct io_kiocb *req = data->req;
2557         struct io_ring_ctx *ctx = req->ctx;
2558         unsigned long flags;
2559
2560         atomic_inc(&ctx->cq_timeouts);
2561
2562         spin_lock_irqsave(&ctx->completion_lock, flags);
2563         /*
2564          * We could be racing with timeout deletion. If the list is empty,
2565          * then timeout lookup already found it and will be handling it.
2566          */
2567         if (!list_empty(&req->list)) {
2568                 struct io_kiocb *prev;
2569
2570                 /*
2571                  * Adjust the reqs sequence before the current one because it
2572                  * will consume a slot in the cq_ring and the the cq_tail
2573                  * pointer will be increased, otherwise other timeout reqs may
2574                  * return in advance without waiting for enough wait_nr.
2575                  */
2576                 prev = req;
2577                 list_for_each_entry_continue_reverse(prev, &ctx->timeout_list, list)
2578                         prev->sequence++;
2579                 list_del_init(&req->list);
2580         }
2581
2582         io_cqring_fill_event(req, -ETIME);
2583         io_commit_cqring(ctx);
2584         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2585
2586         io_cqring_ev_posted(ctx);
2587         req_set_fail_links(req);
2588         io_put_req(req);
2589         return HRTIMER_NORESTART;
2590 }
2591
2592 static int io_timeout_cancel(struct io_ring_ctx *ctx, __u64 user_data)
2593 {
2594         struct io_kiocb *req;
2595         int ret = -ENOENT;
2596
2597         list_for_each_entry(req, &ctx->timeout_list, list) {
2598                 if (user_data == req->user_data) {
2599                         list_del_init(&req->list);
2600                         ret = 0;
2601                         break;
2602                 }
2603         }
2604
2605         if (ret == -ENOENT)
2606                 return ret;
2607
2608         ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
2609         if (ret == -1)
2610                 return -EALREADY;
2611
2612         req_set_fail_links(req);
2613         io_cqring_fill_event(req, -ECANCELED);
2614         io_put_req(req);
2615         return 0;
2616 }
2617
2618 /*
2619  * Remove or update an existing timeout command
2620  */
2621 static int io_timeout_remove(struct io_kiocb *req,
2622                              const struct io_uring_sqe *sqe)
2623 {
2624         struct io_ring_ctx *ctx = req->ctx;
2625         unsigned flags;
2626         int ret;
2627
2628         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2629                 return -EINVAL;
2630         if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len)
2631                 return -EINVAL;
2632         flags = READ_ONCE(sqe->timeout_flags);
2633         if (flags)
2634                 return -EINVAL;
2635
2636         spin_lock_irq(&ctx->completion_lock);
2637         ret = io_timeout_cancel(ctx, READ_ONCE(sqe->addr));
2638
2639         io_cqring_fill_event(req, ret);
2640         io_commit_cqring(ctx);
2641         spin_unlock_irq(&ctx->completion_lock);
2642         io_cqring_ev_posted(ctx);
2643         if (ret < 0)
2644                 req_set_fail_links(req);
2645         io_put_req(req);
2646         return 0;
2647 }
2648
2649 static int io_timeout_prep(struct io_kiocb *req, struct io_async_ctx *io,
2650                            bool is_timeout_link)
2651 {
2652         const struct io_uring_sqe *sqe = req->sqe;
2653         struct io_timeout_data *data;
2654         unsigned flags;
2655
2656         if (unlikely(req->ctx->flags & IORING_SETUP_IOPOLL))
2657                 return -EINVAL;
2658         if (sqe->ioprio || sqe->buf_index || sqe->len != 1)
2659                 return -EINVAL;
2660         if (sqe->off && is_timeout_link)
2661                 return -EINVAL;
2662         flags = READ_ONCE(sqe->timeout_flags);
2663         if (flags & ~IORING_TIMEOUT_ABS)
2664                 return -EINVAL;
2665
2666         data = &io->timeout;
2667         data->req = req;
2668         req->flags |= REQ_F_TIMEOUT;
2669
2670         if (get_timespec64(&data->ts, u64_to_user_ptr(sqe->addr)))
2671                 return -EFAULT;
2672
2673         if (flags & IORING_TIMEOUT_ABS)
2674                 data->mode = HRTIMER_MODE_ABS;
2675         else
2676                 data->mode = HRTIMER_MODE_REL;
2677
2678         hrtimer_init(&data->timer, CLOCK_MONOTONIC, data->mode);
2679         req->io = io;
2680         return 0;
2681 }
2682
2683 static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
2684 {
2685         unsigned count;
2686         struct io_ring_ctx *ctx = req->ctx;
2687         struct io_timeout_data *data;
2688         struct io_async_ctx *io;
2689         struct list_head *entry;
2690         unsigned span = 0;
2691
2692         io = req->io;
2693         if (!io) {
2694                 int ret;
2695
2696                 io = kmalloc(sizeof(*io), GFP_KERNEL);
2697                 if (!io)
2698                         return -ENOMEM;
2699                 ret = io_timeout_prep(req, io, false);
2700                 if (ret) {
2701                         kfree(io);
2702                         return ret;
2703                 }
2704         }
2705         data = &req->io->timeout;
2706
2707         /*
2708          * sqe->off holds how many events that need to occur for this
2709          * timeout event to be satisfied. If it isn't set, then this is
2710          * a pure timeout request, sequence isn't used.
2711          */
2712         count = READ_ONCE(sqe->off);
2713         if (!count) {
2714                 req->flags |= REQ_F_TIMEOUT_NOSEQ;
2715                 spin_lock_irq(&ctx->completion_lock);
2716                 entry = ctx->timeout_list.prev;
2717                 goto add;
2718         }
2719
2720         req->sequence = ctx->cached_sq_head + count - 1;
2721         data->seq_offset = count;
2722
2723         /*
2724          * Insertion sort, ensuring the first entry in the list is always
2725          * the one we need first.
2726          */
2727         spin_lock_irq(&ctx->completion_lock);
2728         list_for_each_prev(entry, &ctx->timeout_list) {
2729                 struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
2730                 unsigned nxt_sq_head;
2731                 long long tmp, tmp_nxt;
2732                 u32 nxt_offset = nxt->io->timeout.seq_offset;
2733
2734                 if (nxt->flags & REQ_F_TIMEOUT_NOSEQ)
2735                         continue;
2736
2737                 /*
2738                  * Since cached_sq_head + count - 1 can overflow, use type long
2739                  * long to store it.
2740                  */
2741                 tmp = (long long)ctx->cached_sq_head + count - 1;
2742                 nxt_sq_head = nxt->sequence - nxt_offset + 1;
2743                 tmp_nxt = (long long)nxt_sq_head + nxt_offset - 1;
2744
2745                 /*
2746                  * cached_sq_head may overflow, and it will never overflow twice
2747                  * once there is some timeout req still be valid.
2748                  */
2749                 if (ctx->cached_sq_head < nxt_sq_head)
2750                         tmp += UINT_MAX;
2751
2752                 if (tmp > tmp_nxt)
2753                         break;
2754
2755                 /*
2756                  * Sequence of reqs after the insert one and itself should
2757                  * be adjusted because each timeout req consumes a slot.
2758                  */
2759                 span++;
2760                 nxt->sequence++;
2761         }
2762         req->sequence -= span;
2763 add:
2764         list_add(&req->list, entry);
2765         data->timer.function = io_timeout_fn;
2766         hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
2767         spin_unlock_irq(&ctx->completion_lock);
2768         return 0;
2769 }
2770
2771 static bool io_cancel_cb(struct io_wq_work *work, void *data)
2772 {
2773         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
2774
2775         return req->user_data == (unsigned long) data;
2776 }
2777
2778 static int io_async_cancel_one(struct io_ring_ctx *ctx, void *sqe_addr)
2779 {
2780         enum io_wq_cancel cancel_ret;
2781         int ret = 0;
2782
2783         cancel_ret = io_wq_cancel_cb(ctx->io_wq, io_cancel_cb, sqe_addr);
2784         switch (cancel_ret) {
2785         case IO_WQ_CANCEL_OK:
2786                 ret = 0;
2787                 break;
2788         case IO_WQ_CANCEL_RUNNING:
2789                 ret = -EALREADY;
2790                 break;
2791         case IO_WQ_CANCEL_NOTFOUND:
2792                 ret = -ENOENT;
2793                 break;
2794         }
2795
2796         return ret;
2797 }
2798
2799 static void io_async_find_and_cancel(struct io_ring_ctx *ctx,
2800                                      struct io_kiocb *req, __u64 sqe_addr,
2801                                      struct io_kiocb **nxt, int success_ret)
2802 {
2803         unsigned long flags;
2804         int ret;
2805
2806         ret = io_async_cancel_one(ctx, (void *) (unsigned long) sqe_addr);
2807         if (ret != -ENOENT) {
2808                 spin_lock_irqsave(&ctx->completion_lock, flags);
2809                 goto done;
2810         }
2811
2812         spin_lock_irqsave(&ctx->completion_lock, flags);
2813         ret = io_timeout_cancel(ctx, sqe_addr);
2814         if (ret != -ENOENT)
2815                 goto done;
2816         ret = io_poll_cancel(ctx, sqe_addr);
2817 done:
2818         if (!ret)
2819                 ret = success_ret;
2820         io_cqring_fill_event(req, ret);
2821         io_commit_cqring(ctx);
2822         spin_unlock_irqrestore(&ctx->completion_lock, flags);
2823         io_cqring_ev_posted(ctx);
2824
2825         if (ret < 0)
2826                 req_set_fail_links(req);
2827         io_put_req_find_next(req, nxt);
2828 }
2829
2830 static int io_async_cancel(struct io_kiocb *req, const struct io_uring_sqe *sqe,
2831                            struct io_kiocb **nxt)
2832 {
2833         struct io_ring_ctx *ctx = req->ctx;
2834
2835         if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
2836                 return -EINVAL;
2837         if (sqe->flags || sqe->ioprio || sqe->off || sqe->len ||
2838             sqe->cancel_flags)
2839                 return -EINVAL;
2840
2841         io_async_find_and_cancel(ctx, req, READ_ONCE(sqe->addr), nxt, 0);
2842         return 0;
2843 }
2844
2845 static int io_req_defer_prep(struct io_kiocb *req, struct io_async_ctx *io)
2846 {
2847         struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
2848         struct iov_iter iter;
2849         ssize_t ret;
2850
2851         memcpy(&io->sqe, req->sqe, sizeof(io->sqe));
2852         req->sqe = &io->sqe;
2853
2854         switch (io->sqe.opcode) {
2855         case IORING_OP_READV:
2856         case IORING_OP_READ_FIXED:
2857                 ret = io_read_prep(req, &iovec, &iter, true);
2858                 break;
2859         case IORING_OP_WRITEV:
2860         case IORING_OP_WRITE_FIXED:
2861                 ret = io_write_prep(req, &iovec, &iter, true);
2862                 break;
2863         case IORING_OP_SENDMSG:
2864                 ret = io_sendmsg_prep(req, io);
2865                 break;
2866         case IORING_OP_RECVMSG:
2867                 ret = io_recvmsg_prep(req, io);
2868                 break;
2869         case IORING_OP_CONNECT:
2870                 ret = io_connect_prep(req, io);
2871                 break;
2872         case IORING_OP_TIMEOUT:
2873                 return io_timeout_prep(req, io, false);
2874         case IORING_OP_LINK_TIMEOUT:
2875                 return io_timeout_prep(req, io, true);
2876         default:
2877                 req->io = io;
2878                 return 0;
2879         }
2880
2881         if (ret < 0)
2882                 return ret;
2883
2884         req->io = io;
2885         io_req_map_io(req, ret, iovec, inline_vecs, &iter);
2886         return 0;
2887 }
2888
2889 static int io_req_defer(struct io_kiocb *req)
2890 {
2891         struct io_ring_ctx *ctx = req->ctx;
2892         struct io_async_ctx *io;
2893         int ret;
2894
2895         /* Still need defer if there is pending req in defer list. */
2896         if (!req_need_defer(req) && list_empty(&ctx->defer_list))
2897                 return 0;
2898
2899         io = kmalloc(sizeof(*io), GFP_KERNEL);
2900         if (!io)
2901                 return -EAGAIN;
2902
2903         ret = io_req_defer_prep(req, io);
2904         if (ret < 0) {
2905                 kfree(io);
2906                 return ret;
2907         }
2908
2909         spin_lock_irq(&ctx->completion_lock);
2910         if (!req_need_defer(req) && list_empty(&ctx->defer_list)) {
2911                 spin_unlock_irq(&ctx->completion_lock);
2912                 return 0;
2913         }
2914
2915         trace_io_uring_defer(ctx, req, req->user_data);
2916         list_add_tail(&req->list, &ctx->defer_list);
2917         spin_unlock_irq(&ctx->completion_lock);
2918         return -EIOCBQUEUED;
2919 }
2920
2921 __attribute__((nonnull))
2922 static int io_issue_sqe(struct io_kiocb *req, struct io_kiocb **nxt,
2923                         bool force_nonblock)
2924 {
2925         int ret, opcode;
2926         struct io_ring_ctx *ctx = req->ctx;
2927
2928         opcode = READ_ONCE(req->sqe->opcode);
2929         switch (opcode) {
2930         case IORING_OP_NOP:
2931                 ret = io_nop(req);
2932                 break;
2933         case IORING_OP_READV:
2934                 if (unlikely(req->sqe->buf_index))
2935                         return -EINVAL;
2936                 ret = io_read(req, nxt, force_nonblock);
2937                 break;
2938         case IORING_OP_WRITEV:
2939                 if (unlikely(req->sqe->buf_index))
2940                         return -EINVAL;
2941                 ret = io_write(req, nxt, force_nonblock);
2942                 break;
2943         case IORING_OP_READ_FIXED:
2944                 ret = io_read(req, nxt, force_nonblock);
2945                 break;
2946         case IORING_OP_WRITE_FIXED:
2947                 ret = io_write(req, nxt, force_nonblock);
2948                 break;
2949         case IORING_OP_FSYNC:
2950                 ret = io_fsync(req, req->sqe, nxt, force_nonblock);
2951                 break;
2952         case IORING_OP_POLL_ADD:
2953                 ret = io_poll_add(req, req->sqe, nxt);
2954                 break;
2955         case IORING_OP_POLL_REMOVE:
2956                 ret = io_poll_remove(req, req->sqe);
2957                 break;
2958         case IORING_OP_SYNC_FILE_RANGE:
2959                 ret = io_sync_file_range(req, req->sqe, nxt, force_nonblock);
2960                 break;
2961         case IORING_OP_SENDMSG:
2962                 ret = io_sendmsg(req, req->sqe, nxt, force_nonblock);
2963                 break;
2964         case IORING_OP_RECVMSG:
2965                 ret = io_recvmsg(req, req->sqe, nxt, force_nonblock);
2966                 break;
2967         case IORING_OP_TIMEOUT:
2968                 ret = io_timeout(req, req->sqe);
2969                 break;
2970         case IORING_OP_TIMEOUT_REMOVE:
2971                 ret = io_timeout_remove(req, req->sqe);
2972                 break;
2973         case IORING_OP_ACCEPT:
2974                 ret = io_accept(req, req->sqe, nxt, force_nonblock);
2975                 break;
2976         case IORING_OP_CONNECT:
2977                 ret = io_connect(req, req->sqe, nxt, force_nonblock);
2978                 break;
2979         case IORING_OP_ASYNC_CANCEL:
2980                 ret = io_async_cancel(req, req->sqe, nxt);
2981                 break;
2982         default:
2983                 ret = -EINVAL;
2984                 break;
2985         }
2986
2987         if (ret)
2988                 return ret;
2989
2990         if (ctx->flags & IORING_SETUP_IOPOLL) {
2991                 if (req->result == -EAGAIN)
2992                         return -EAGAIN;
2993
2994                 io_iopoll_req_issued(req);
2995         }
2996
2997         return 0;
2998 }
2999
3000 static void io_link_work_cb(struct io_wq_work **workptr)
3001 {
3002         struct io_wq_work *work = *workptr;
3003         struct io_kiocb *link = work->data;
3004
3005         io_queue_linked_timeout(link);
3006         work->func = io_wq_submit_work;
3007 }
3008
3009 static void io_wq_submit_work(struct io_wq_work **workptr)
3010 {
3011         struct io_wq_work *work = *workptr;
3012         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
3013         struct io_kiocb *nxt = NULL;
3014         int ret = 0;
3015
3016         /* Ensure we clear previously set non-block flag */
3017         req->rw.ki_flags &= ~IOCB_NOWAIT;
3018
3019         if (work->flags & IO_WQ_WORK_CANCEL)
3020                 ret = -ECANCELED;
3021
3022         if (!ret) {
3023                 req->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
3024                 req->in_async = true;
3025                 do {
3026                         ret = io_issue_sqe(req, &nxt, false);
3027                         /*
3028                          * We can get EAGAIN for polled IO even though we're
3029                          * forcing a sync submission from here, since we can't
3030                          * wait for request slots on the block side.
3031                          */
3032                         if (ret != -EAGAIN)
3033                                 break;
3034                         cond_resched();
3035                 } while (1);
3036         }
3037
3038         /* drop submission reference */
3039         io_put_req(req);
3040
3041         if (ret) {
3042                 req_set_fail_links(req);
3043                 io_cqring_add_event(req, ret);
3044                 io_put_req(req);
3045         }
3046
3047         /* if a dependent link is ready, pass it back */
3048         if (!ret && nxt) {
3049                 struct io_kiocb *link;
3050
3051                 io_prep_async_work(nxt, &link);
3052                 *workptr = &nxt->work;
3053                 if (link) {
3054                         nxt->work.flags |= IO_WQ_WORK_CB;
3055                         nxt->work.func = io_link_work_cb;
3056                         nxt->work.data = link;
3057                 }
3058         }
3059 }
3060
3061 static bool io_op_needs_file(const struct io_uring_sqe *sqe)
3062 {
3063         int op = READ_ONCE(sqe->opcode);
3064
3065         switch (op) {
3066         case IORING_OP_NOP:
3067         case IORING_OP_POLL_REMOVE:
3068         case IORING_OP_TIMEOUT:
3069         case IORING_OP_TIMEOUT_REMOVE:
3070         case IORING_OP_ASYNC_CANCEL:
3071         case IORING_OP_LINK_TIMEOUT:
3072                 return false;
3073         default:
3074                 return true;
3075         }
3076 }
3077
3078 static inline struct file *io_file_from_index(struct io_ring_ctx *ctx,
3079                                               int index)
3080 {
3081         struct fixed_file_table *table;
3082
3083         table = &ctx->file_table[index >> IORING_FILE_TABLE_SHIFT];
3084         return table->files[index & IORING_FILE_TABLE_MASK];
3085 }
3086
3087 static int io_req_set_file(struct io_submit_state *state, struct io_kiocb *req)
3088 {
3089         struct io_ring_ctx *ctx = req->ctx;
3090         unsigned flags;
3091         int fd;
3092
3093         flags = READ_ONCE(req->sqe->flags);
3094         fd = READ_ONCE(req->sqe->fd);
3095
3096         if (flags & IOSQE_IO_DRAIN)
3097                 req->flags |= REQ_F_IO_DRAIN;
3098
3099         if (!io_op_needs_file(req->sqe))
3100                 return 0;
3101
3102         if (flags & IOSQE_FIXED_FILE) {
3103                 if (unlikely(!ctx->file_table ||
3104                     (unsigned) fd >= ctx->nr_user_files))
3105                         return -EBADF;
3106                 fd = array_index_nospec(fd, ctx->nr_user_files);
3107                 req->file = io_file_from_index(ctx, fd);
3108                 if (!req->file)
3109                         return -EBADF;
3110                 req->flags |= REQ_F_FIXED_FILE;
3111         } else {
3112                 if (req->needs_fixed_file)
3113                         return -EBADF;
3114                 trace_io_uring_file_get(ctx, fd);
3115                 req->file = io_file_get(state, fd);
3116                 if (unlikely(!req->file))
3117                         return -EBADF;
3118         }
3119
3120         return 0;
3121 }
3122
3123 static int io_grab_files(struct io_kiocb *req)
3124 {
3125         int ret = -EBADF;
3126         struct io_ring_ctx *ctx = req->ctx;
3127
3128         rcu_read_lock();
3129         spin_lock_irq(&ctx->inflight_lock);
3130         /*
3131          * We use the f_ops->flush() handler to ensure that we can flush
3132          * out work accessing these files if the fd is closed. Check if
3133          * the fd has changed since we started down this path, and disallow
3134          * this operation if it has.
3135          */
3136         if (fcheck(req->ring_fd) == req->ring_file) {
3137                 list_add(&req->inflight_entry, &ctx->inflight_list);
3138                 req->flags |= REQ_F_INFLIGHT;
3139                 req->work.files = current->files;
3140                 ret = 0;
3141         }
3142         spin_unlock_irq(&ctx->inflight_lock);
3143         rcu_read_unlock();
3144
3145         return ret;
3146 }
3147
3148 static enum hrtimer_restart io_link_timeout_fn(struct hrtimer *timer)
3149 {
3150         struct io_timeout_data *data = container_of(timer,
3151                                                 struct io_timeout_data, timer);
3152         struct io_kiocb *req = data->req;
3153         struct io_ring_ctx *ctx = req->ctx;
3154         struct io_kiocb *prev = NULL;
3155         unsigned long flags;
3156
3157         spin_lock_irqsave(&ctx->completion_lock, flags);
3158
3159         /*
3160          * We don't expect the list to be empty, that will only happen if we
3161          * race with the completion of the linked work.
3162          */
3163         if (!list_empty(&req->link_list)) {
3164                 prev = list_entry(req->link_list.prev, struct io_kiocb,
3165                                   link_list);
3166                 if (refcount_inc_not_zero(&prev->refs)) {
3167                         list_del_init(&req->link_list);
3168                         prev->flags &= ~REQ_F_LINK_TIMEOUT;
3169                 } else
3170                         prev = NULL;
3171         }
3172
3173         spin_unlock_irqrestore(&ctx->completion_lock, flags);
3174
3175         if (prev) {
3176                 req_set_fail_links(prev);
3177                 io_async_find_and_cancel(ctx, req, prev->user_data, NULL,
3178                                                 -ETIME);
3179                 io_put_req(prev);
3180         } else {
3181                 io_cqring_add_event(req, -ETIME);
3182                 io_put_req(req);
3183         }
3184         return HRTIMER_NORESTART;
3185 }
3186
3187 static void io_queue_linked_timeout(struct io_kiocb *req)
3188 {
3189         struct io_ring_ctx *ctx = req->ctx;
3190
3191         /*
3192          * If the list is now empty, then our linked request finished before
3193          * we got a chance to setup the timer
3194          */
3195         spin_lock_irq(&ctx->completion_lock);
3196         if (!list_empty(&req->link_list)) {
3197                 struct io_timeout_data *data = &req->io->timeout;
3198
3199                 data->timer.function = io_link_timeout_fn;
3200                 hrtimer_start(&data->timer, timespec64_to_ktime(data->ts),
3201                                 data->mode);
3202         }
3203         spin_unlock_irq(&ctx->completion_lock);
3204
3205         /* drop submission reference */
3206         io_put_req(req);
3207 }
3208
3209 static struct io_kiocb *io_prep_linked_timeout(struct io_kiocb *req)
3210 {
3211         struct io_kiocb *nxt;
3212
3213         if (!(req->flags & REQ_F_LINK))
3214                 return NULL;
3215
3216         nxt = list_first_entry_or_null(&req->link_list, struct io_kiocb,
3217                                         link_list);
3218         if (!nxt || nxt->sqe->opcode != IORING_OP_LINK_TIMEOUT)
3219                 return NULL;
3220
3221         req->flags |= REQ_F_LINK_TIMEOUT;
3222         return nxt;
3223 }
3224
3225 static void __io_queue_sqe(struct io_kiocb *req)
3226 {
3227         struct io_kiocb *linked_timeout;
3228         struct io_kiocb *nxt = NULL;
3229         int ret;
3230
3231 again:
3232         linked_timeout = io_prep_linked_timeout(req);
3233
3234         ret = io_issue_sqe(req, &nxt, true);
3235
3236         /*
3237          * We async punt it if the file wasn't marked NOWAIT, or if the file
3238          * doesn't support non-blocking read/write attempts
3239          */
3240         if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
3241             (req->flags & REQ_F_MUST_PUNT))) {
3242                 if (req->work.flags & IO_WQ_WORK_NEEDS_FILES) {
3243                         ret = io_grab_files(req);
3244                         if (ret)
3245                                 goto err;
3246                 }
3247
3248                 /*
3249                  * Queued up for async execution, worker will release
3250                  * submit reference when the iocb is actually submitted.
3251                  */
3252                 io_queue_async_work(req);
3253                 goto done_req;
3254         }
3255
3256 err:
3257         /* drop submission reference */
3258         io_put_req(req);
3259
3260         if (linked_timeout) {
3261                 if (!ret)
3262                         io_queue_linked_timeout(linked_timeout);
3263                 else
3264                         io_put_req(linked_timeout);
3265         }
3266
3267         /* and drop final reference, if we failed */
3268         if (ret) {
3269                 io_cqring_add_event(req, ret);
3270                 req_set_fail_links(req);
3271                 io_put_req(req);
3272         }
3273 done_req:
3274         if (nxt) {
3275                 req = nxt;
3276                 nxt = NULL;
3277                 goto again;
3278         }
3279 }
3280
3281 static void io_queue_sqe(struct io_kiocb *req)
3282 {
3283         int ret;
3284
3285         if (unlikely(req->ctx->drain_next)) {
3286                 req->flags |= REQ_F_IO_DRAIN;
3287                 req->ctx->drain_next = false;
3288         }
3289         req->ctx->drain_next = (req->flags & REQ_F_DRAIN_LINK);
3290
3291         ret = io_req_defer(req);
3292         if (ret) {
3293                 if (ret != -EIOCBQUEUED) {
3294                         io_cqring_add_event(req, ret);
3295                         req_set_fail_links(req);
3296                         io_double_put_req(req);
3297                 }
3298         } else
3299                 __io_queue_sqe(req);
3300 }
3301
3302 static inline void io_queue_link_head(struct io_kiocb *req)
3303 {
3304         if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
3305                 io_cqring_add_event(req, -ECANCELED);
3306                 io_double_put_req(req);
3307         } else
3308                 io_queue_sqe(req);
3309 }
3310
3311
3312 #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK| \
3313                                 IOSQE_IO_HARDLINK)
3314
3315 static bool io_submit_sqe(struct io_kiocb *req, struct io_submit_state *state,
3316                           struct io_kiocb **link)
3317 {
3318         struct io_ring_ctx *ctx = req->ctx;
3319         int ret;
3320
3321         req->user_data = req->sqe->user_data;
3322
3323         /* enforce forwards compatibility on users */
3324         if (unlikely(req->sqe->flags & ~SQE_VALID_FLAGS)) {
3325                 ret = -EINVAL;
3326                 goto err_req;
3327         }
3328
3329         ret = io_req_set_file(state, req);
3330         if (unlikely(ret)) {
3331 err_req:
3332                 io_cqring_add_event(req, ret);
3333                 io_double_put_req(req);
3334                 return false;
3335         }
3336
3337         /*
3338          * If we already have a head request, queue this one for async
3339          * submittal once the head completes. If we don't have a head but
3340          * IOSQE_IO_LINK is set in the sqe, start a new head. This one will be
3341          * submitted sync once the chain is complete. If none of those
3342          * conditions are true (normal request), then just queue it.
3343          */
3344         if (*link) {
3345                 struct io_kiocb *prev = *link;
3346                 struct io_async_ctx *io;
3347
3348                 if (req->sqe->flags & IOSQE_IO_DRAIN)
3349                         (*link)->flags |= REQ_F_DRAIN_LINK | REQ_F_IO_DRAIN;
3350
3351                 if (req->sqe->flags & IOSQE_IO_HARDLINK)
3352                         req->flags |= REQ_F_HARDLINK;
3353
3354                 io = kmalloc(sizeof(*io), GFP_KERNEL);
3355                 if (!io) {
3356                         ret = -EAGAIN;
3357                         goto err_req;
3358                 }
3359
3360                 ret = io_req_defer_prep(req, io);
3361                 if (ret) {
3362                         kfree(io);
3363                         /* fail even hard links since we don't submit */
3364                         prev->flags |= REQ_F_FAIL_LINK;
3365                         goto err_req;
3366                 }
3367                 trace_io_uring_link(ctx, req, prev);
3368                 list_add_tail(&req->link_list, &prev->link_list);
3369         } else if (req->sqe->flags & (IOSQE_IO_LINK|IOSQE_IO_HARDLINK)) {
3370                 req->flags |= REQ_F_LINK;
3371                 if (req->sqe->flags & IOSQE_IO_HARDLINK)
3372                         req->flags |= REQ_F_HARDLINK;
3373
3374                 INIT_LIST_HEAD(&req->link_list);
3375                 *link = req;
3376         } else {
3377                 io_queue_sqe(req);
3378         }
3379
3380         return true;
3381 }
3382
3383 /*
3384  * Batched submission is done, ensure local IO is flushed out.
3385  */
3386 static void io_submit_state_end(struct io_submit_state *state)
3387 {
3388         blk_finish_plug(&state->plug);
3389         io_file_put(state);
3390         if (state->free_reqs)
3391                 kmem_cache_free_bulk(req_cachep, state->free_reqs,
3392                                         &state->reqs[state->cur_req]);
3393 }
3394
3395 /*
3396  * Start submission side cache.
3397  */
3398 static void io_submit_state_start(struct io_submit_state *state,
3399                                   unsigned int max_ios)
3400 {
3401         blk_start_plug(&state->plug);
3402         state->free_reqs = 0;
3403         state->file = NULL;
3404         state->ios_left = max_ios;
3405 }
3406
3407 static void io_commit_sqring(struct io_ring_ctx *ctx)
3408 {
3409         struct io_rings *rings = ctx->rings;
3410
3411         if (ctx->cached_sq_head != READ_ONCE(rings->sq.head)) {
3412                 /*
3413                  * Ensure any loads from the SQEs are done at this point,
3414                  * since once we write the new head, the application could
3415                  * write new data to them.
3416                  */
3417                 smp_store_release(&rings->sq.head, ctx->cached_sq_head);
3418         }
3419 }
3420
3421 /*
3422  * Fetch an sqe, if one is available. Note that s->sqe will point to memory
3423  * that is mapped by userspace. This means that care needs to be taken to
3424  * ensure that reads are stable, as we cannot rely on userspace always
3425  * being a good citizen. If members of the sqe are validated and then later
3426  * used, it's important that those reads are done through READ_ONCE() to
3427  * prevent a re-load down the line.
3428  */
3429 static bool io_get_sqring(struct io_ring_ctx *ctx, struct io_kiocb *req)
3430 {
3431         struct io_rings *rings = ctx->rings;
3432         u32 *sq_array = ctx->sq_array;
3433         unsigned head;
3434
3435         /*
3436          * The cached sq head (or cq tail) serves two purposes:
3437          *
3438          * 1) allows us to batch the cost of updating the user visible
3439          *    head updates.
3440          * 2) allows the kernel side to track the head on its own, even
3441          *    though the application is the one updating it.
3442          */
3443         head = ctx->cached_sq_head;
3444         /* make sure SQ entry isn't read before tail */
3445         if (unlikely(head == smp_load_acquire(&rings->sq.tail)))
3446                 return false;
3447
3448         head = READ_ONCE(sq_array[head & ctx->sq_mask]);
3449         if (likely(head < ctx->sq_entries)) {
3450                 /*
3451                  * All io need record the previous position, if LINK vs DARIN,
3452                  * it can be used to mark the position of the first IO in the
3453                  * link list.
3454                  */
3455                 req->sequence = ctx->cached_sq_head;
3456                 req->sqe = &ctx->sq_sqes[head];
3457                 ctx->cached_sq_head++;
3458                 return true;
3459         }
3460
3461         /* drop invalid entries */
3462         ctx->cached_sq_head++;
3463         ctx->cached_sq_dropped++;
3464         WRITE_ONCE(rings->sq_dropped, ctx->cached_sq_dropped);
3465         return false;
3466 }
3467
3468 static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
3469                           struct file *ring_file, int ring_fd,
3470                           struct mm_struct **mm, bool async)
3471 {
3472         struct io_submit_state state, *statep = NULL;
3473         struct io_kiocb *link = NULL;
3474         int i, submitted = 0;
3475         bool mm_fault = false;
3476
3477         /* if we have a backlog and couldn't flush it all, return BUSY */
3478         if (!list_empty(&ctx->cq_overflow_list) &&
3479             !io_cqring_overflow_flush(ctx, false))
3480                 return -EBUSY;
3481
3482         if (nr > IO_PLUG_THRESHOLD) {
3483                 io_submit_state_start(&state, nr);
3484                 statep = &state;
3485         }
3486
3487         for (i = 0; i < nr; i++) {
3488                 struct io_kiocb *req;
3489                 unsigned int sqe_flags;
3490
3491                 req = io_get_req(ctx, statep);
3492                 if (unlikely(!req)) {
3493                         if (!submitted)
3494                                 submitted = -EAGAIN;
3495                         break;
3496                 }
3497                 if (!io_get_sqring(ctx, req)) {
3498                         __io_free_req(req);
3499                         break;
3500                 }
3501
3502                 if (io_sqe_needs_user(req->sqe) && !*mm) {
3503                         mm_fault = mm_fault || !mmget_not_zero(ctx->sqo_mm);
3504                         if (!mm_fault) {
3505                                 use_mm(ctx->sqo_mm);
3506                                 *mm = ctx->sqo_mm;
3507                         }
3508                 }
3509
3510                 submitted++;
3511                 sqe_flags = req->sqe->flags;
3512
3513                 req->ring_file = ring_file;
3514                 req->ring_fd = ring_fd;
3515                 req->has_user = *mm != NULL;
3516                 req->in_async = async;
3517                 req->needs_fixed_file = async;
3518                 trace_io_uring_submit_sqe(ctx, req->sqe->user_data,
3519                                           true, async);
3520                 if (!io_submit_sqe(req, statep, &link))
3521                         break;
3522                 /*
3523                  * If previous wasn't linked and we have a linked command,
3524                  * that's the end of the chain. Submit the previous link.
3525                  */
3526                 if (!(sqe_flags & IOSQE_IO_LINK) && link) {
3527                         io_queue_link_head(link);
3528                         link = NULL;
3529                 }
3530         }
3531
3532         if (link)
3533                 io_queue_link_head(link);
3534         if (statep)
3535                 io_submit_state_end(&state);
3536
3537          /* Commit SQ ring head once we've consumed and submitted all SQEs */
3538         io_commit_sqring(ctx);
3539
3540         return submitted;
3541 }
3542
3543 static int io_sq_thread(void *data)
3544 {
3545         struct io_ring_ctx *ctx = data;
3546         struct mm_struct *cur_mm = NULL;
3547         const struct cred *old_cred;
3548         mm_segment_t old_fs;
3549         DEFINE_WAIT(wait);
3550         unsigned inflight;
3551         unsigned long timeout;
3552         int ret;
3553
3554         complete(&ctx->completions[1]);
3555
3556         old_fs = get_fs();
3557         set_fs(USER_DS);
3558         old_cred = override_creds(ctx->creds);
3559
3560         ret = timeout = inflight = 0;
3561         while (!kthread_should_park()) {
3562                 unsigned int to_submit;
3563
3564                 if (inflight) {
3565                         unsigned nr_events = 0;
3566
3567                         if (ctx->flags & IORING_SETUP_IOPOLL) {
3568                                 /*
3569                                  * inflight is the count of the maximum possible
3570                                  * entries we submitted, but it can be smaller
3571                                  * if we dropped some of them. If we don't have
3572                                  * poll entries available, then we know that we
3573                                  * have nothing left to poll for. Reset the
3574                                  * inflight count to zero in that case.
3575                                  */
3576                                 mutex_lock(&ctx->uring_lock);
3577                                 if (!list_empty(&ctx->poll_list))
3578                                         __io_iopoll_check(ctx, &nr_events, 0);
3579                                 else
3580                                         inflight = 0;
3581                                 mutex_unlock(&ctx->uring_lock);
3582                         } else {
3583                                 /*
3584                                  * Normal IO, just pretend everything completed.
3585                                  * We don't have to poll completions for that.
3586                                  */
3587                                 nr_events = inflight;
3588                         }
3589
3590                         inflight -= nr_events;
3591                         if (!inflight)
3592                                 timeout = jiffies + ctx->sq_thread_idle;
3593                 }
3594
3595                 to_submit = io_sqring_entries(ctx);
3596
3597                 /*
3598                  * If submit got -EBUSY, flag us as needing the application
3599                  * to enter the kernel to reap and flush events.
3600                  */
3601                 if (!to_submit || ret == -EBUSY) {
3602                         /*
3603                          * We're polling. If we're within the defined idle
3604                          * period, then let us spin without work before going
3605                          * to sleep. The exception is if we got EBUSY doing
3606                          * more IO, we should wait for the application to
3607                          * reap events and wake us up.
3608                          */
3609                         if (inflight ||
3610                             (!time_after(jiffies, timeout) && ret != -EBUSY)) {
3611                                 cond_resched();
3612                                 continue;
3613                         }
3614
3615                         /*
3616                          * Drop cur_mm before scheduling, we can't hold it for
3617                          * long periods (or over schedule()). Do this before
3618                          * adding ourselves to the waitqueue, as the unuse/drop
3619                          * may sleep.
3620                          */
3621                         if (cur_mm) {
3622                                 unuse_mm(cur_mm);
3623                                 mmput(cur_mm);
3624                                 cur_mm = NULL;
3625                         }
3626
3627                         prepare_to_wait(&ctx->sqo_wait, &wait,
3628                                                 TASK_INTERRUPTIBLE);
3629
3630                         /* Tell userspace we may need a wakeup call */
3631                         ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
3632                         /* make sure to read SQ tail after writing flags */
3633                         smp_mb();
3634
3635                         to_submit = io_sqring_entries(ctx);
3636                         if (!to_submit || ret == -EBUSY) {
3637                                 if (kthread_should_park()) {
3638                                         finish_wait(&ctx->sqo_wait, &wait);
3639                                         break;
3640                                 }
3641                                 if (signal_pending(current))
3642                                         flush_signals(current);
3643                                 schedule();
3644                                 finish_wait(&ctx->sqo_wait, &wait);
3645
3646                                 ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3647                                 continue;
3648                         }
3649                         finish_wait(&ctx->sqo_wait, &wait);
3650
3651                         ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
3652                 }
3653
3654                 to_submit = min(to_submit, ctx->sq_entries);
3655                 mutex_lock(&ctx->uring_lock);
3656                 ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
3657                 mutex_unlock(&ctx->uring_lock);
3658                 if (ret > 0)
3659                         inflight += ret;
3660         }
3661
3662         set_fs(old_fs);
3663         if (cur_mm) {
3664                 unuse_mm(cur_mm);
3665                 mmput(cur_mm);
3666         }
3667         revert_creds(old_cred);
3668
3669         kthread_parkme();
3670
3671         return 0;
3672 }
3673
3674 struct io_wait_queue {
3675         struct wait_queue_entry wq;
3676         struct io_ring_ctx *ctx;
3677         unsigned to_wait;
3678         unsigned nr_timeouts;
3679 };
3680
3681 static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
3682 {
3683         struct io_ring_ctx *ctx = iowq->ctx;
3684
3685         /*
3686          * Wake up if we have enough events, or if a timeout occured since we
3687          * started waiting. For timeouts, we always want to return to userspace,
3688          * regardless of event count.
3689          */
3690         return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
3691                         atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
3692 }
3693
3694 static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
3695                             int wake_flags, void *key)
3696 {
3697         struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
3698                                                         wq);
3699
3700         /* use noflush == true, as we can't safely rely on locking context */
3701         if (!io_should_wake(iowq, true))
3702                 return -1;
3703
3704         return autoremove_wake_function(curr, mode, wake_flags, key);
3705 }
3706
3707 /*
3708  * Wait until events become available, if we don't already have some. The
3709  * application must reap them itself, as they reside on the shared cq ring.
3710  */
3711 static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
3712                           const sigset_t __user *sig, size_t sigsz)
3713 {
3714         struct io_wait_queue iowq = {
3715                 .wq = {
3716                         .private        = current,
3717                         .func           = io_wake_function,
3718                         .entry          = LIST_HEAD_INIT(iowq.wq.entry),
3719                 },
3720                 .ctx            = ctx,
3721                 .to_wait        = min_events,
3722         };
3723         struct io_rings *rings = ctx->rings;
3724         int ret = 0;
3725
3726         if (io_cqring_events(ctx, false) >= min_events)
3727                 return 0;
3728
3729         if (sig) {
3730 #ifdef CONFIG_COMPAT
3731                 if (in_compat_syscall())
3732                         ret = set_compat_user_sigmask((const compat_sigset_t __user *)sig,
3733                                                       sigsz);
3734                 else
3735 #endif
3736                         ret = set_user_sigmask(sig, sigsz);
3737
3738                 if (ret)
3739                         return ret;
3740         }
3741
3742         iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
3743         trace_io_uring_cqring_wait(ctx, min_events);
3744         do {
3745                 prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
3746                                                 TASK_INTERRUPTIBLE);
3747                 if (io_should_wake(&iowq, false))
3748                         break;
3749                 schedule();
3750                 if (signal_pending(current)) {
3751                         ret = -EINTR;
3752                         break;
3753                 }
3754         } while (1);
3755         finish_wait(&ctx->wait, &iowq.wq);
3756
3757         restore_saved_sigmask_unless(ret == -EINTR);
3758
3759         return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
3760 }
3761
3762 static void __io_sqe_files_unregister(struct io_ring_ctx *ctx)
3763 {
3764 #if defined(CONFIG_UNIX)
3765         if (ctx->ring_sock) {
3766                 struct sock *sock = ctx->ring_sock->sk;
3767                 struct sk_buff *skb;
3768
3769                 while ((skb = skb_dequeue(&sock->sk_receive_queue)) != NULL)
3770                         kfree_skb(skb);
3771         }
3772 #else
3773         int i;
3774
3775         for (i = 0; i < ctx->nr_user_files; i++) {
3776                 struct file *file;
3777
3778                 file = io_file_from_index(ctx, i);
3779                 if (file)
3780                         fput(file);
3781         }
3782 #endif
3783 }
3784
3785 static int io_sqe_files_unregister(struct io_ring_ctx *ctx)
3786 {
3787         unsigned nr_tables, i;
3788
3789         if (!ctx->file_table)
3790                 return -ENXIO;
3791
3792         __io_sqe_files_unregister(ctx);
3793         nr_tables = DIV_ROUND_UP(ctx->nr_user_files, IORING_MAX_FILES_TABLE);
3794         for (i = 0; i < nr_tables; i++)
3795                 kfree(ctx->file_table[i].files);
3796         kfree(ctx->file_table);
3797         ctx->file_table = NULL;
3798         ctx->nr_user_files = 0;
3799         return 0;
3800 }
3801
3802 static void io_sq_thread_stop(struct io_ring_ctx *ctx)
3803 {
3804         if (ctx->sqo_thread) {
3805                 wait_for_completion(&ctx->completions[1]);
3806                 /*
3807                  * The park is a bit of a work-around, without it we get
3808                  * warning spews on shutdown with SQPOLL set and affinity
3809                  * set to a single CPU.
3810                  */
3811                 kthread_park(ctx->sqo_thread);
3812                 kthread_stop(ctx->sqo_thread);
3813                 ctx->sqo_thread = NULL;
3814         }
3815 }
3816
3817 static void io_finish_async(struct io_ring_ctx *ctx)
3818 {
3819         io_sq_thread_stop(ctx);
3820
3821         if (ctx->io_wq) {
3822                 io_wq_destroy(ctx->io_wq);
3823                 ctx->io_wq = NULL;
3824         }
3825 }
3826
3827 #if defined(CONFIG_UNIX)
3828 static void io_destruct_skb(struct sk_buff *skb)
3829 {
3830         struct io_ring_ctx *ctx = skb->sk->sk_user_data;
3831
3832         if (ctx->io_wq)
3833                 io_wq_flush(ctx->io_wq);
3834
3835         unix_destruct_scm(skb);
3836 }
3837
3838 /*
3839  * Ensure the UNIX gc is aware of our file set, so we are certain that
3840  * the io_uring can be safely unregistered on process exit, even if we have
3841  * loops in the file referencing.
3842  */
3843 static int __io_sqe_files_scm(struct io_ring_ctx *ctx, int nr, int offset)
3844 {
3845         struct sock *sk = ctx->ring_sock->sk;
3846         struct scm_fp_list *fpl;
3847         struct sk_buff *skb;
3848         int i, nr_files;
3849
3850         if (!capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN)) {
3851                 unsigned long inflight = ctx->user->unix_inflight + nr;
3852
3853                 if (inflight > task_rlimit(current, RLIMIT_NOFILE))
3854                         return -EMFILE;
3855         }
3856
3857         fpl = kzalloc(sizeof(*fpl), GFP_KERNEL);
3858         if (!fpl)
3859                 return -ENOMEM;
3860
3861         skb = alloc_skb(0, GFP_KERNEL);
3862         if (!skb) {
3863                 kfree(fpl);
3864                 return -ENOMEM;
3865         }
3866
3867         skb->sk = sk;
3868
3869         nr_files = 0;
3870         fpl->user = get_uid(ctx->user);
3871         for (i = 0; i < nr; i++) {
3872                 struct file *file = io_file_from_index(ctx, i + offset);
3873
3874                 if (!file)
3875                         continue;
3876                 fpl->fp[nr_files] = get_file(file);
3877                 unix_inflight(fpl->user, fpl->fp[nr_files]);
3878                 nr_files++;
3879         }
3880
3881         if (nr_files) {
3882                 fpl->max = SCM_MAX_FD;
3883                 fpl->count = nr_files;
3884                 UNIXCB(skb).fp = fpl;
3885                 skb->destructor = io_destruct_skb;
3886                 refcount_add(skb->truesize, &sk->sk_wmem_alloc);
3887                 skb_queue_head(&sk->sk_receive_queue, skb);
3888
3889                 for (i = 0; i < nr_files; i++)
3890                         fput(fpl->fp[i]);
3891         } else {
3892                 kfree_skb(skb);
3893                 kfree(fpl);
3894         }
3895
3896         return 0;
3897 }
3898
3899 /*
3900  * If UNIX sockets are enabled, fd passing can cause a reference cycle which
3901  * causes regular reference counting to break down. We rely on the UNIX
3902  * garbage collection to take care of this problem for us.
3903  */
3904 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3905 {
3906         unsigned left, total;
3907         int ret = 0;
3908
3909         total = 0;
3910         left = ctx->nr_user_files;
3911         while (left) {
3912                 unsigned this_files = min_t(unsigned, left, SCM_MAX_FD);
3913
3914                 ret = __io_sqe_files_scm(ctx, this_files, total);
3915                 if (ret)
3916                         break;
3917                 left -= this_files;
3918                 total += this_files;
3919         }
3920
3921         if (!ret)
3922                 return 0;
3923
3924         while (total < ctx->nr_user_files) {
3925                 struct file *file = io_file_from_index(ctx, total);
3926
3927                 if (file)
3928                         fput(file);
3929                 total++;
3930         }
3931
3932         return ret;
3933 }
3934 #else
3935 static int io_sqe_files_scm(struct io_ring_ctx *ctx)
3936 {
3937         return 0;
3938 }
3939 #endif
3940
3941 static int io_sqe_alloc_file_tables(struct io_ring_ctx *ctx, unsigned nr_tables,
3942                                     unsigned nr_files)
3943 {
3944         int i;
3945
3946         for (i = 0; i < nr_tables; i++) {
3947                 struct fixed_file_table *table = &ctx->file_table[i];
3948                 unsigned this_files;
3949
3950                 this_files = min(nr_files, IORING_MAX_FILES_TABLE);
3951                 table->files = kcalloc(this_files, sizeof(struct file *),
3952                                         GFP_KERNEL);
3953                 if (!table->files)
3954                         break;
3955                 nr_files -= this_files;
3956         }
3957
3958         if (i == nr_tables)
3959                 return 0;
3960
3961         for (i = 0; i < nr_tables; i++) {
3962                 struct fixed_file_table *table = &ctx->file_table[i];
3963                 kfree(table->files);
3964         }
3965         return 1;
3966 }
3967
3968 static int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
3969                                  unsigned nr_args)
3970 {
3971         __s32 __user *fds = (__s32 __user *) arg;
3972         unsigned nr_tables;
3973         int fd, ret = 0;
3974         unsigned i;
3975
3976         if (ctx->file_table)
3977                 return -EBUSY;
3978         if (!nr_args)
3979                 return -EINVAL;
3980         if (nr_args > IORING_MAX_FIXED_FILES)
3981                 return -EMFILE;
3982
3983         nr_tables = DIV_ROUND_UP(nr_args, IORING_MAX_FILES_TABLE);
3984         ctx->file_table = kcalloc(nr_tables, sizeof(struct fixed_file_table),
3985                                         GFP_KERNEL);
3986         if (!ctx->file_table)
3987                 return -ENOMEM;
3988
3989         if (io_sqe_alloc_file_tables(ctx, nr_tables, nr_args)) {
3990                 kfree(ctx->file_table);
3991                 ctx->file_table = NULL;
3992                 return -ENOMEM;
3993         }
3994
3995         for (i = 0; i < nr_args; i++, ctx->nr_user_files++) {
3996                 struct fixed_file_table *table;
3997                 unsigned index;
3998
3999                 ret = -EFAULT;
4000                 if (copy_from_user(&fd, &fds[i], sizeof(fd)))
4001                         break;
4002                 /* allow sparse sets */
4003                 if (fd == -1) {
4004                         ret = 0;
4005                         continue;
4006                 }
4007
4008                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4009                 index = i & IORING_FILE_TABLE_MASK;
4010                 table->files[index] = fget(fd);
4011
4012                 ret = -EBADF;
4013                 if (!table->files[index])
4014                         break;
4015                 /*
4016                  * Don't allow io_uring instances to be registered. If UNIX
4017                  * isn't enabled, then this causes a reference cycle and this
4018                  * instance can never get freed. If UNIX is enabled we'll
4019                  * handle it just fine, but there's still no point in allowing
4020                  * a ring fd as it doesn't support regular read/write anyway.
4021                  */
4022                 if (table->files[index]->f_op == &io_uring_fops) {
4023                         fput(table->files[index]);
4024                         break;
4025                 }
4026                 ret = 0;
4027         }
4028
4029         if (ret) {
4030                 for (i = 0; i < ctx->nr_user_files; i++) {
4031                         struct file *file;
4032
4033                         file = io_file_from_index(ctx, i);
4034                         if (file)
4035                                 fput(file);
4036                 }
4037                 for (i = 0; i < nr_tables; i++)
4038                         kfree(ctx->file_table[i].files);
4039
4040                 kfree(ctx->file_table);
4041                 ctx->file_table = NULL;
4042                 ctx->nr_user_files = 0;
4043                 return ret;
4044         }
4045
4046         ret = io_sqe_files_scm(ctx);
4047         if (ret)
4048                 io_sqe_files_unregister(ctx);
4049
4050         return ret;
4051 }
4052
4053 static void io_sqe_file_unregister(struct io_ring_ctx *ctx, int index)
4054 {
4055 #if defined(CONFIG_UNIX)
4056         struct file *file = io_file_from_index(ctx, index);
4057         struct sock *sock = ctx->ring_sock->sk;
4058         struct sk_buff_head list, *head = &sock->sk_receive_queue;
4059         struct sk_buff *skb;
4060         int i;
4061
4062         __skb_queue_head_init(&list);
4063
4064         /*
4065          * Find the skb that holds this file in its SCM_RIGHTS. When found,
4066          * remove this entry and rearrange the file array.
4067          */
4068         skb = skb_dequeue(head);
4069         while (skb) {
4070                 struct scm_fp_list *fp;
4071
4072                 fp = UNIXCB(skb).fp;
4073                 for (i = 0; i < fp->count; i++) {
4074                         int left;
4075
4076                         if (fp->fp[i] != file)
4077                                 continue;
4078
4079                         unix_notinflight(fp->user, fp->fp[i]);
4080                         left = fp->count - 1 - i;
4081                         if (left) {
4082                                 memmove(&fp->fp[i], &fp->fp[i + 1],
4083                                                 left * sizeof(struct file *));
4084                         }
4085                         fp->count--;
4086                         if (!fp->count) {
4087                                 kfree_skb(skb);
4088                                 skb = NULL;
4089                         } else {
4090                                 __skb_queue_tail(&list, skb);
4091                         }
4092                         fput(file);
4093                         file = NULL;
4094                         break;
4095                 }
4096
4097                 if (!file)
4098                         break;
4099
4100                 __skb_queue_tail(&list, skb);
4101
4102                 skb = skb_dequeue(head);
4103         }
4104
4105         if (skb_peek(&list)) {
4106                 spin_lock_irq(&head->lock);
4107                 while ((skb = __skb_dequeue(&list)) != NULL)
4108                         __skb_queue_tail(head, skb);
4109                 spin_unlock_irq(&head->lock);
4110         }
4111 #else
4112         fput(io_file_from_index(ctx, index));
4113 #endif
4114 }
4115
4116 static int io_sqe_file_register(struct io_ring_ctx *ctx, struct file *file,
4117                                 int index)
4118 {
4119 #if defined(CONFIG_UNIX)
4120         struct sock *sock = ctx->ring_sock->sk;
4121         struct sk_buff_head *head = &sock->sk_receive_queue;
4122         struct sk_buff *skb;
4123
4124         /*
4125          * See if we can merge this file into an existing skb SCM_RIGHTS
4126          * file set. If there's no room, fall back to allocating a new skb
4127          * and filling it in.
4128          */
4129         spin_lock_irq(&head->lock);
4130         skb = skb_peek(head);
4131         if (skb) {
4132                 struct scm_fp_list *fpl = UNIXCB(skb).fp;
4133
4134                 if (fpl->count < SCM_MAX_FD) {
4135                         __skb_unlink(skb, head);
4136                         spin_unlock_irq(&head->lock);
4137                         fpl->fp[fpl->count] = get_file(file);
4138                         unix_inflight(fpl->user, fpl->fp[fpl->count]);
4139                         fpl->count++;
4140                         spin_lock_irq(&head->lock);
4141                         __skb_queue_head(head, skb);
4142                 } else {
4143                         skb = NULL;
4144                 }
4145         }
4146         spin_unlock_irq(&head->lock);
4147
4148         if (skb) {
4149                 fput(file);
4150                 return 0;
4151         }
4152
4153         return __io_sqe_files_scm(ctx, 1, index);
4154 #else
4155         return 0;
4156 #endif
4157 }
4158
4159 static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
4160                                unsigned nr_args)
4161 {
4162         struct io_uring_files_update up;
4163         __s32 __user *fds;
4164         int fd, i, err;
4165         __u32 done;
4166
4167         if (!ctx->file_table)
4168                 return -ENXIO;
4169         if (!nr_args)
4170                 return -EINVAL;
4171         if (copy_from_user(&up, arg, sizeof(up)))
4172                 return -EFAULT;
4173         if (check_add_overflow(up.offset, nr_args, &done))
4174                 return -EOVERFLOW;
4175         if (done > ctx->nr_user_files)
4176                 return -EINVAL;
4177
4178         done = 0;
4179         fds = (__s32 __user *) up.fds;
4180         while (nr_args) {
4181                 struct fixed_file_table *table;
4182                 unsigned index;
4183
4184                 err = 0;
4185                 if (copy_from_user(&fd, &fds[done], sizeof(fd))) {
4186                         err = -EFAULT;
4187                         break;
4188                 }
4189                 i = array_index_nospec(up.offset, ctx->nr_user_files);
4190                 table = &ctx->file_table[i >> IORING_FILE_TABLE_SHIFT];
4191                 index = i & IORING_FILE_TABLE_MASK;
4192                 if (table->files[index]) {
4193                         io_sqe_file_unregister(ctx, i);
4194                         table->files[index] = NULL;
4195                 }
4196                 if (fd != -1) {
4197                         struct file *file;
4198
4199                         file = fget(fd);
4200                         if (!file) {
4201                                 err = -EBADF;
4202                                 break;
4203                         }
4204                         /*
4205                          * Don't allow io_uring instances to be registered. If
4206                          * UNIX isn't enabled, then this causes a reference
4207                          * cycle and this instance can never get freed. If UNIX
4208                          * is enabled we'll handle it just fine, but there's
4209                          * still no point in allowing a ring fd as it doesn't
4210                          * support regular read/write anyway.
4211                          */
4212                         if (file->f_op == &io_uring_fops) {
4213                                 fput(file);
4214                                 err = -EBADF;
4215                                 break;
4216                         }
4217                         table->files[index] = file;
4218                         err = io_sqe_file_register(ctx, file, i);
4219                         if (err)
4220                                 break;
4221                 }
4222                 nr_args--;
4223                 done++;
4224                 up.offset++;
4225         }
4226
4227         return done ? done : err;
4228 }
4229
4230 static void io_put_work(struct io_wq_work *work)
4231 {
4232         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4233
4234         io_put_req(req);
4235 }
4236
4237 static void io_get_work(struct io_wq_work *work)
4238 {
4239         struct io_kiocb *req = container_of(work, struct io_kiocb, work);
4240
4241         refcount_inc(&req->refs);
4242 }
4243
4244 static int io_sq_offload_start(struct io_ring_ctx *ctx,
4245                                struct io_uring_params *p)
4246 {
4247         struct io_wq_data data;
4248         unsigned concurrency;
4249         int ret;
4250
4251         init_waitqueue_head(&ctx->sqo_wait);
4252         mmgrab(current->mm);
4253         ctx->sqo_mm = current->mm;
4254
4255         if (ctx->flags & IORING_SETUP_SQPOLL) {
4256                 ret = -EPERM;
4257                 if (!capable(CAP_SYS_ADMIN))
4258                         goto err;
4259
4260                 ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
4261                 if (!ctx->sq_thread_idle)
4262                         ctx->sq_thread_idle = HZ;
4263
4264                 if (p->flags & IORING_SETUP_SQ_AFF) {
4265                         int cpu = p->sq_thread_cpu;
4266
4267                         ret = -EINVAL;
4268                         if (cpu >= nr_cpu_ids)
4269                                 goto err;
4270                         if (!cpu_online(cpu))
4271                                 goto err;
4272
4273                         ctx->sqo_thread = kthread_create_on_cpu(io_sq_thread,
4274                                                         ctx, cpu,
4275                                                         "io_uring-sq");
4276                 } else {
4277                         ctx->sqo_thread = kthread_create(io_sq_thread, ctx,
4278                                                         "io_uring-sq");
4279                 }
4280                 if (IS_ERR(ctx->sqo_thread)) {
4281                         ret = PTR_ERR(ctx->sqo_thread);
4282                         ctx->sqo_thread = NULL;
4283                         goto err;
4284                 }
4285                 wake_up_process(ctx->sqo_thread);
4286         } else if (p->flags & IORING_SETUP_SQ_AFF) {
4287                 /* Can't have SQ_AFF without SQPOLL */
4288                 ret = -EINVAL;
4289                 goto err;
4290         }
4291
4292         data.mm = ctx->sqo_mm;
4293         data.user = ctx->user;
4294         data.creds = ctx->creds;
4295         data.get_work = io_get_work;
4296         data.put_work = io_put_work;
4297
4298         /* Do QD, or 4 * CPUS, whatever is smallest */
4299         concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
4300         ctx->io_wq = io_wq_create(concurrency, &data);
4301         if (IS_ERR(ctx->io_wq)) {
4302                 ret = PTR_ERR(ctx->io_wq);
4303                 ctx->io_wq = NULL;
4304                 goto err;
4305         }
4306
4307         return 0;
4308 err:
4309         io_finish_async(ctx);
4310         mmdrop(ctx->sqo_mm);
4311         ctx->sqo_mm = NULL;
4312         return ret;
4313 }
4314
4315 static void io_unaccount_mem(struct user_struct *user, unsigned long nr_pages)
4316 {
4317         atomic_long_sub(nr_pages, &user->locked_vm);
4318 }
4319
4320 static int io_account_mem(struct user_struct *user, unsigned long nr_pages)
4321 {
4322         unsigned long page_limit, cur_pages, new_pages;
4323
4324         /* Don't allow more pages than we can safely lock */
4325         page_limit = rlimit(RLIMIT_MEMLOCK) >> PAGE_SHIFT;
4326
4327         do {
4328                 cur_pages = atomic_long_read(&user->locked_vm);
4329                 new_pages = cur_pages + nr_pages;
4330                 if (new_pages > page_limit)
4331                         return -ENOMEM;
4332         } while (atomic_long_cmpxchg(&user->locked_vm, cur_pages,
4333                                         new_pages) != cur_pages);
4334
4335         return 0;
4336 }
4337
4338 static void io_mem_free(void *ptr)
4339 {
4340         struct page *page;
4341
4342         if (!ptr)
4343                 return;
4344
4345         page = virt_to_head_page(ptr);
4346         if (put_page_testzero(page))
4347                 free_compound_page(page);
4348 }
4349
4350 static void *io_mem_alloc(size_t size)
4351 {
4352         gfp_t gfp_flags = GFP_KERNEL | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP |
4353                                 __GFP_NORETRY;
4354
4355         return (void *) __get_free_pages(gfp_flags, get_order(size));
4356 }
4357
4358 static unsigned long rings_size(unsigned sq_entries, unsigned cq_entries,
4359                                 size_t *sq_offset)
4360 {
4361         struct io_rings *rings;
4362         size_t off, sq_array_size;
4363
4364         off = struct_size(rings, cqes, cq_entries);
4365         if (off == SIZE_MAX)
4366                 return SIZE_MAX;
4367
4368 #ifdef CONFIG_SMP
4369         off = ALIGN(off, SMP_CACHE_BYTES);
4370         if (off == 0)
4371                 return SIZE_MAX;
4372 #endif
4373
4374         sq_array_size = array_size(sizeof(u32), sq_entries);
4375         if (sq_array_size == SIZE_MAX)
4376                 return SIZE_MAX;
4377
4378         if (check_add_overflow(off, sq_array_size, &off))
4379                 return SIZE_MAX;
4380
4381         if (sq_offset)
4382                 *sq_offset = off;
4383
4384         return off;
4385 }
4386
4387 static unsigned long ring_pages(unsigned sq_entries, unsigned cq_entries)
4388 {
4389         size_t pages;
4390
4391         pages = (size_t)1 << get_order(
4392                 rings_size(sq_entries, cq_entries, NULL));
4393         pages += (size_t)1 << get_order(
4394                 array_size(sizeof(struct io_uring_sqe), sq_entries));
4395
4396         return pages;
4397 }
4398
4399 static int io_sqe_buffer_unregister(struct io_ring_ctx *ctx)
4400 {
4401         int i, j;
4402
4403         if (!ctx->user_bufs)
4404                 return -ENXIO;
4405
4406         for (i = 0; i < ctx->nr_user_bufs; i++) {
4407                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4408
4409                 for (j = 0; j < imu->nr_bvecs; j++)
4410                         put_user_page(imu->bvec[j].bv_page);
4411
4412                 if (ctx->account_mem)
4413                         io_unaccount_mem(ctx->user, imu->nr_bvecs);
4414                 kvfree(imu->bvec);
4415                 imu->nr_bvecs = 0;
4416         }
4417
4418         kfree(ctx->user_bufs);
4419         ctx->user_bufs = NULL;
4420         ctx->nr_user_bufs = 0;
4421         return 0;
4422 }
4423
4424 static int io_copy_iov(struct io_ring_ctx *ctx, struct iovec *dst,
4425                        void __user *arg, unsigned index)
4426 {
4427         struct iovec __user *src;
4428
4429 #ifdef CONFIG_COMPAT
4430         if (ctx->compat) {
4431                 struct compat_iovec __user *ciovs;
4432                 struct compat_iovec ciov;
4433
4434                 ciovs = (struct compat_iovec __user *) arg;
4435                 if (copy_from_user(&ciov, &ciovs[index], sizeof(ciov)))
4436                         return -EFAULT;
4437
4438                 dst->iov_base = (void __user *) (unsigned long) ciov.iov_base;
4439                 dst->iov_len = ciov.iov_len;
4440                 return 0;
4441         }
4442 #endif
4443         src = (struct iovec __user *) arg;
4444         if (copy_from_user(dst, &src[index], sizeof(*dst)))
4445                 return -EFAULT;
4446         return 0;
4447 }
4448
4449 static int io_sqe_buffer_register(struct io_ring_ctx *ctx, void __user *arg,
4450                                   unsigned nr_args)
4451 {
4452         struct vm_area_struct **vmas = NULL;
4453         struct page **pages = NULL;
4454         int i, j, got_pages = 0;
4455         int ret = -EINVAL;
4456
4457         if (ctx->user_bufs)
4458                 return -EBUSY;
4459         if (!nr_args || nr_args > UIO_MAXIOV)
4460                 return -EINVAL;
4461
4462         ctx->user_bufs = kcalloc(nr_args, sizeof(struct io_mapped_ubuf),
4463                                         GFP_KERNEL);
4464         if (!ctx->user_bufs)
4465                 return -ENOMEM;
4466
4467         for (i = 0; i < nr_args; i++) {
4468                 struct io_mapped_ubuf *imu = &ctx->user_bufs[i];
4469                 unsigned long off, start, end, ubuf;
4470                 int pret, nr_pages;
4471                 struct iovec iov;
4472                 size_t size;
4473
4474                 ret = io_copy_iov(ctx, &iov, arg, i);
4475                 if (ret)
4476                         goto err;
4477
4478                 /*
4479                  * Don't impose further limits on the size and buffer
4480                  * constraints here, we'll -EINVAL later when IO is
4481                  * submitted if they are wrong.
4482                  */
4483                 ret = -EFAULT;
4484                 if (!iov.iov_base || !iov.iov_len)
4485                         goto err;
4486
4487                 /* arbitrary limit, but we need something */
4488                 if (iov.iov_len > SZ_1G)
4489                         goto err;
4490
4491                 ubuf = (unsigned long) iov.iov_base;
4492                 end = (ubuf + iov.iov_len + PAGE_SIZE - 1) >> PAGE_SHIFT;
4493                 start = ubuf >> PAGE_SHIFT;
4494                 nr_pages = end - start;
4495
4496                 if (ctx->account_mem) {
4497                         ret = io_account_mem(ctx->user, nr_pages);
4498                         if (ret)
4499                                 goto err;
4500                 }
4501
4502                 ret = 0;
4503                 if (!pages || nr_pages > got_pages) {
4504                         kfree(vmas);
4505                         kfree(pages);
4506                         pages = kvmalloc_array(nr_pages, sizeof(struct page *),
4507                                                 GFP_KERNEL);
4508                         vmas = kvmalloc_array(nr_pages,
4509                                         sizeof(struct vm_area_struct *),
4510                                         GFP_KERNEL);
4511                         if (!pages || !vmas) {
4512                                 ret = -ENOMEM;
4513                                 if (ctx->account_mem)
4514                                         io_unaccount_mem(ctx->user, nr_pages);
4515                                 goto err;
4516                         }
4517                         got_pages = nr_pages;
4518                 }
4519
4520                 imu->bvec = kvmalloc_array(nr_pages, sizeof(struct bio_vec),
4521                                                 GFP_KERNEL);
4522                 ret = -ENOMEM;
4523                 if (!imu->bvec) {
4524                         if (ctx->account_mem)
4525                                 io_unaccount_mem(ctx->user, nr_pages);
4526                         goto err;
4527                 }
4528
4529                 ret = 0;
4530                 down_read(&current->mm->mmap_sem);
4531                 pret = get_user_pages(ubuf, nr_pages,
4532                                       FOLL_WRITE | FOLL_LONGTERM,
4533                                       pages, vmas);
4534                 if (pret == nr_pages) {
4535                         /* don't support file backed memory */
4536                         for (j = 0; j < nr_pages; j++) {
4537                                 struct vm_area_struct *vma = vmas[j];
4538
4539                                 if (vma->vm_file &&
4540                                     !is_file_hugepages(vma->vm_file)) {
4541                                         ret = -EOPNOTSUPP;
4542                                         break;
4543                                 }
4544                         }
4545                 } else {
4546                         ret = pret < 0 ? pret : -EFAULT;
4547                 }
4548                 up_read(&current->mm->mmap_sem);
4549                 if (ret) {
4550                         /*
4551                          * if we did partial map, or found file backed vmas,
4552                          * release any pages we did get
4553                          */
4554                         if (pret > 0)
4555                                 put_user_pages(pages, pret);
4556                         if (ctx->account_mem)
4557                                 io_unaccount_mem(ctx->user, nr_pages);
4558                         kvfree(imu->bvec);
4559                         goto err;
4560                 }
4561
4562                 off = ubuf & ~PAGE_MASK;
4563                 size = iov.iov_len;
4564                 for (j = 0; j < nr_pages; j++) {
4565                         size_t vec_len;
4566
4567                         vec_len = min_t(size_t, size, PAGE_SIZE - off);
4568                         imu->bvec[j].bv_page = pages[j];
4569                         imu->bvec[j].bv_len = vec_len;
4570                         imu->bvec[j].bv_offset = off;
4571                         off = 0;
4572                         size -= vec_len;
4573                 }
4574                 /* store original address for later verification */
4575                 imu->ubuf = ubuf;
4576                 imu->len = iov.iov_len;
4577                 imu->nr_bvecs = nr_pages;
4578
4579                 ctx->nr_user_bufs++;
4580         }
4581         kvfree(pages);
4582         kvfree(vmas);
4583         return 0;
4584 err:
4585         kvfree(pages);
4586         kvfree(vmas);
4587         io_sqe_buffer_unregister(ctx);
4588         return ret;
4589 }
4590
4591 static int io_eventfd_register(struct io_ring_ctx *ctx, void __user *arg)
4592 {
4593         __s32 __user *fds = arg;
4594         int fd;
4595
4596         if (ctx->cq_ev_fd)
4597                 return -EBUSY;
4598
4599         if (copy_from_user(&fd, fds, sizeof(*fds)))
4600                 return -EFAULT;
4601
4602         ctx->cq_ev_fd = eventfd_ctx_fdget(fd);
4603         if (IS_ERR(ctx->cq_ev_fd)) {
4604                 int ret = PTR_ERR(ctx->cq_ev_fd);
4605                 ctx->cq_ev_fd = NULL;
4606                 return ret;
4607         }
4608
4609         return 0;
4610 }
4611
4612 static int io_eventfd_unregister(struct io_ring_ctx *ctx)
4613 {
4614         if (ctx->cq_ev_fd) {
4615                 eventfd_ctx_put(ctx->cq_ev_fd);
4616                 ctx->cq_ev_fd = NULL;
4617                 return 0;
4618         }
4619
4620         return -ENXIO;
4621 }
4622
4623 static void io_ring_ctx_free(struct io_ring_ctx *ctx)
4624 {
4625         io_finish_async(ctx);
4626         if (ctx->sqo_mm)
4627                 mmdrop(ctx->sqo_mm);
4628
4629         io_iopoll_reap_events(ctx);
4630         io_sqe_buffer_unregister(ctx);
4631         io_sqe_files_unregister(ctx);
4632         io_eventfd_unregister(ctx);
4633
4634 #if defined(CONFIG_UNIX)
4635         if (ctx->ring_sock) {
4636                 ctx->ring_sock->file = NULL; /* so that iput() is called */
4637                 sock_release(ctx->ring_sock);
4638         }
4639 #endif
4640
4641         io_mem_free(ctx->rings);
4642         io_mem_free(ctx->sq_sqes);
4643
4644         percpu_ref_exit(&ctx->refs);
4645         if (ctx->account_mem)
4646                 io_unaccount_mem(ctx->user,
4647                                 ring_pages(ctx->sq_entries, ctx->cq_entries));
4648         free_uid(ctx->user);
4649         put_cred(ctx->creds);
4650         kfree(ctx->completions);
4651         kfree(ctx->cancel_hash);
4652         kmem_cache_free(req_cachep, ctx->fallback_req);
4653         kfree(ctx);
4654 }
4655
4656 static __poll_t io_uring_poll(struct file *file, poll_table *wait)
4657 {
4658         struct io_ring_ctx *ctx = file->private_data;
4659         __poll_t mask = 0;
4660
4661         poll_wait(file, &ctx->cq_wait, wait);
4662         /*
4663          * synchronizes with barrier from wq_has_sleeper call in
4664          * io_commit_cqring
4665          */
4666         smp_rmb();
4667         if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
4668             ctx->rings->sq_ring_entries)
4669                 mask |= EPOLLOUT | EPOLLWRNORM;
4670         if (READ_ONCE(ctx->rings->cq.head) != ctx->cached_cq_tail)
4671                 mask |= EPOLLIN | EPOLLRDNORM;
4672
4673         return mask;
4674 }
4675
4676 static int io_uring_fasync(int fd, struct file *file, int on)
4677 {
4678         struct io_ring_ctx *ctx = file->private_data;
4679
4680         return fasync_helper(fd, file, on, &ctx->cq_fasync);
4681 }
4682
4683 static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
4684 {
4685         mutex_lock(&ctx->uring_lock);
4686         percpu_ref_kill(&ctx->refs);
4687         mutex_unlock(&ctx->uring_lock);
4688
4689         io_kill_timeouts(ctx);
4690         io_poll_remove_all(ctx);
4691
4692         if (ctx->io_wq)
4693                 io_wq_cancel_all(ctx->io_wq);
4694
4695         io_iopoll_reap_events(ctx);
4696         /* if we failed setting up the ctx, we might not have any rings */
4697         if (ctx->rings)
4698                 io_cqring_overflow_flush(ctx, true);
4699         wait_for_completion(&ctx->completions[0]);
4700         io_ring_ctx_free(ctx);
4701 }
4702
4703 static int io_uring_release(struct inode *inode, struct file *file)
4704 {
4705         struct io_ring_ctx *ctx = file->private_data;
4706
4707         file->private_data = NULL;
4708         io_ring_ctx_wait_and_kill(ctx);
4709         return 0;
4710 }
4711
4712 static void io_uring_cancel_files(struct io_ring_ctx *ctx,
4713                                   struct files_struct *files)
4714 {
4715         struct io_kiocb *req;
4716         DEFINE_WAIT(wait);
4717
4718         while (!list_empty_careful(&ctx->inflight_list)) {
4719                 struct io_kiocb *cancel_req = NULL;
4720
4721                 spin_lock_irq(&ctx->inflight_lock);
4722                 list_for_each_entry(req, &ctx->inflight_list, inflight_entry) {
4723                         if (req->work.files != files)
4724                                 continue;
4725                         /* req is being completed, ignore */
4726                         if (!refcount_inc_not_zero(&req->refs))
4727                                 continue;
4728                         cancel_req = req;
4729                         break;
4730                 }
4731                 if (cancel_req)
4732                         prepare_to_wait(&ctx->inflight_wait, &wait,
4733                                                 TASK_UNINTERRUPTIBLE);
4734                 spin_unlock_irq(&ctx->inflight_lock);
4735
4736                 /* We need to keep going until we don't find a matching req */
4737                 if (!cancel_req)
4738                         break;
4739
4740                 io_wq_cancel_work(ctx->io_wq, &cancel_req->work);
4741                 io_put_req(cancel_req);
4742                 schedule();
4743         }
4744         finish_wait(&ctx->inflight_wait, &wait);
4745 }
4746
4747 static int io_uring_flush(struct file *file, void *data)
4748 {
4749         struct io_ring_ctx *ctx = file->private_data;
4750
4751         io_uring_cancel_files(ctx, data);
4752         if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
4753                 io_cqring_overflow_flush(ctx, true);
4754                 io_wq_cancel_all(ctx->io_wq);
4755         }
4756         return 0;
4757 }
4758
4759 static void *io_uring_validate_mmap_request(struct file *file,
4760                                             loff_t pgoff, size_t sz)
4761 {
4762         struct io_ring_ctx *ctx = file->private_data;
4763         loff_t offset = pgoff << PAGE_SHIFT;
4764         struct page *page;
4765         void *ptr;
4766
4767         switch (offset) {
4768         case IORING_OFF_SQ_RING:
4769         case IORING_OFF_CQ_RING:
4770                 ptr = ctx->rings;
4771                 break;
4772         case IORING_OFF_SQES:
4773                 ptr = ctx->sq_sqes;
4774                 break;
4775         default:
4776                 return ERR_PTR(-EINVAL);
4777         }
4778
4779         page = virt_to_head_page(ptr);
4780         if (sz > page_size(page))
4781                 return ERR_PTR(-EINVAL);
4782
4783         return ptr;
4784 }
4785
4786 #ifdef CONFIG_MMU
4787
4788 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4789 {
4790         size_t sz = vma->vm_end - vma->vm_start;
4791         unsigned long pfn;
4792         void *ptr;
4793
4794         ptr = io_uring_validate_mmap_request(file, vma->vm_pgoff, sz);
4795         if (IS_ERR(ptr))
4796                 return PTR_ERR(ptr);
4797
4798         pfn = virt_to_phys(ptr) >> PAGE_SHIFT;
4799         return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
4800 }
4801
4802 #else /* !CONFIG_MMU */
4803
4804 static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
4805 {
4806         return vma->vm_flags & (VM_SHARED | VM_MAYSHARE) ? 0 : -EINVAL;
4807 }
4808
4809 static unsigned int io_uring_nommu_mmap_capabilities(struct file *file)
4810 {
4811         return NOMMU_MAP_DIRECT | NOMMU_MAP_READ | NOMMU_MAP_WRITE;
4812 }
4813
4814 static unsigned long io_uring_nommu_get_unmapped_area(struct file *file,
4815         unsigned long addr, unsigned long len,
4816         unsigned long pgoff, unsigned long flags)
4817 {
4818         void *ptr;
4819
4820         ptr = io_uring_validate_mmap_request(file, pgoff, len);
4821         if (IS_ERR(ptr))
4822                 return PTR_ERR(ptr);
4823
4824         return (unsigned long) ptr;
4825 }
4826
4827 #endif /* !CONFIG_MMU */
4828
4829 SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
4830                 u32, min_complete, u32, flags, const sigset_t __user *, sig,
4831                 size_t, sigsz)
4832 {
4833         struct io_ring_ctx *ctx;
4834         long ret = -EBADF;
4835         int submitted = 0;
4836         struct fd f;
4837
4838         if (flags & ~(IORING_ENTER_GETEVENTS | IORING_ENTER_SQ_WAKEUP))
4839                 return -EINVAL;
4840
4841         f = fdget(fd);
4842         if (!f.file)
4843                 return -EBADF;
4844
4845         ret = -EOPNOTSUPP;
4846         if (f.file->f_op != &io_uring_fops)
4847                 goto out_fput;
4848
4849         ret = -ENXIO;
4850         ctx = f.file->private_data;
4851         if (!percpu_ref_tryget(&ctx->refs))
4852                 goto out_fput;
4853
4854         /*
4855          * For SQ polling, the thread will do all submissions and completions.
4856          * Just return the requested submit count, and wake the thread if
4857          * we were asked to.
4858          */
4859         ret = 0;
4860         if (ctx->flags & IORING_SETUP_SQPOLL) {
4861                 if (!list_empty_careful(&ctx->cq_overflow_list))
4862                         io_cqring_overflow_flush(ctx, false);
4863                 if (flags & IORING_ENTER_SQ_WAKEUP)
4864                         wake_up(&ctx->sqo_wait);
4865                 submitted = to_submit;
4866         } else if (to_submit) {
4867                 struct mm_struct *cur_mm;
4868
4869                 to_submit = min(to_submit, ctx->sq_entries);
4870                 mutex_lock(&ctx->uring_lock);
4871                 /* already have mm, so io_submit_sqes() won't try to grab it */
4872                 cur_mm = ctx->sqo_mm;
4873                 submitted = io_submit_sqes(ctx, to_submit, f.file, fd,
4874                                            &cur_mm, false);
4875                 mutex_unlock(&ctx->uring_lock);
4876         }
4877         if (flags & IORING_ENTER_GETEVENTS) {
4878                 unsigned nr_events = 0;
4879
4880                 min_complete = min(min_complete, ctx->cq_entries);
4881
4882                 if (ctx->flags & IORING_SETUP_IOPOLL) {
4883                         ret = io_iopoll_check(ctx, &nr_events, min_complete);
4884                 } else {
4885                         ret = io_cqring_wait(ctx, min_complete, sig, sigsz);
4886                 }
4887         }
4888
4889         percpu_ref_put(&ctx->refs);
4890 out_fput:
4891         fdput(f);
4892         return submitted ? submitted : ret;
4893 }
4894
4895 static const struct file_operations io_uring_fops = {
4896         .release        = io_uring_release,
4897         .flush          = io_uring_flush,
4898         .mmap           = io_uring_mmap,
4899 #ifndef CONFIG_MMU
4900         .get_unmapped_area = io_uring_nommu_get_unmapped_area,
4901         .mmap_capabilities = io_uring_nommu_mmap_capabilities,
4902 #endif
4903         .poll           = io_uring_poll,
4904         .fasync         = io_uring_fasync,
4905 };
4906
4907 static int io_allocate_scq_urings(struct io_ring_ctx *ctx,
4908                                   struct io_uring_params *p)
4909 {
4910         struct io_rings *rings;
4911         size_t size, sq_array_offset;
4912
4913         size = rings_size(p->sq_entries, p->cq_entries, &sq_array_offset);
4914         if (size == SIZE_MAX)
4915                 return -EOVERFLOW;
4916
4917         rings = io_mem_alloc(size);
4918         if (!rings)
4919                 return -ENOMEM;
4920
4921         ctx->rings = rings;
4922         ctx->sq_array = (u32 *)((char *)rings + sq_array_offset);
4923         rings->sq_ring_mask = p->sq_entries - 1;
4924         rings->cq_ring_mask = p->cq_entries - 1;
4925         rings->sq_ring_entries = p->sq_entries;
4926         rings->cq_ring_entries = p->cq_entries;
4927         ctx->sq_mask = rings->sq_ring_mask;
4928         ctx->cq_mask = rings->cq_ring_mask;
4929         ctx->sq_entries = rings->sq_ring_entries;
4930         ctx->cq_entries = rings->cq_ring_entries;
4931
4932         size = array_size(sizeof(struct io_uring_sqe), p->sq_entries);
4933         if (size == SIZE_MAX) {
4934                 io_mem_free(ctx->rings);
4935                 ctx->rings = NULL;
4936                 return -EOVERFLOW;
4937         }
4938
4939         ctx->sq_sqes = io_mem_alloc(size);
4940         if (!ctx->sq_sqes) {
4941                 io_mem_free(ctx->rings);
4942                 ctx->rings = NULL;
4943                 return -ENOMEM;
4944         }
4945
4946         return 0;
4947 }
4948
4949 /*
4950  * Allocate an anonymous fd, this is what constitutes the application
4951  * visible backing of an io_uring instance. The application mmaps this
4952  * fd to gain access to the SQ/CQ ring details. If UNIX sockets are enabled,
4953  * we have to tie this fd to a socket for file garbage collection purposes.
4954  */
4955 static int io_uring_get_fd(struct io_ring_ctx *ctx)
4956 {
4957         struct file *file;
4958         int ret;
4959
4960 #if defined(CONFIG_UNIX)
4961         ret = sock_create_kern(&init_net, PF_UNIX, SOCK_RAW, IPPROTO_IP,
4962                                 &ctx->ring_sock);
4963         if (ret)
4964                 return ret;
4965 #endif
4966
4967         ret = get_unused_fd_flags(O_RDWR | O_CLOEXEC);
4968         if (ret < 0)
4969                 goto err;
4970
4971         file = anon_inode_getfile("[io_uring]", &io_uring_fops, ctx,
4972                                         O_RDWR | O_CLOEXEC);
4973         if (IS_ERR(file)) {
4974                 put_unused_fd(ret);
4975                 ret = PTR_ERR(file);
4976                 goto err;
4977         }
4978
4979 #if defined(CONFIG_UNIX)
4980         ctx->ring_sock->file = file;
4981         ctx->ring_sock->sk->sk_user_data = ctx;
4982 #endif
4983         fd_install(ret, file);
4984         return ret;
4985 err:
4986 #if defined(CONFIG_UNIX)
4987         sock_release(ctx->ring_sock);
4988         ctx->ring_sock = NULL;
4989 #endif
4990         return ret;
4991 }
4992
4993 static int io_uring_create(unsigned entries, struct io_uring_params *p)
4994 {
4995         struct user_struct *user = NULL;
4996         struct io_ring_ctx *ctx;
4997         bool account_mem;
4998         int ret;
4999
5000         if (!entries || entries > IORING_MAX_ENTRIES)
5001                 return -EINVAL;
5002
5003         /*
5004          * Use twice as many entries for the CQ ring. It's possible for the
5005          * application to drive a higher depth than the size of the SQ ring,
5006          * since the sqes are only used at submission time. This allows for
5007          * some flexibility in overcommitting a bit. If the application has
5008          * set IORING_SETUP_CQSIZE, it will have passed in the desired number
5009          * of CQ ring entries manually.
5010          */
5011         p->sq_entries = roundup_pow_of_two(entries);
5012         if (p->flags & IORING_SETUP_CQSIZE) {
5013                 /*
5014                  * If IORING_SETUP_CQSIZE is set, we do the same roundup
5015                  * to a power-of-two, if it isn't already. We do NOT impose
5016                  * any cq vs sq ring sizing.
5017                  */
5018                 if (p->cq_entries < p->sq_entries || p->cq_entries > IORING_MAX_CQ_ENTRIES)
5019                         return -EINVAL;
5020                 p->cq_entries = roundup_pow_of_two(p->cq_entries);
5021         } else {
5022                 p->cq_entries = 2 * p->sq_entries;
5023         }
5024
5025         user = get_uid(current_user());
5026         account_mem = !capable(CAP_IPC_LOCK);
5027
5028         if (account_mem) {
5029                 ret = io_account_mem(user,
5030                                 ring_pages(p->sq_entries, p->cq_entries));
5031                 if (ret) {
5032                         free_uid(user);
5033                         return ret;
5034                 }
5035         }
5036
5037         ctx = io_ring_ctx_alloc(p);
5038         if (!ctx) {
5039                 if (account_mem)
5040                         io_unaccount_mem(user, ring_pages(p->sq_entries,
5041                                                                 p->cq_entries));
5042                 free_uid(user);
5043                 return -ENOMEM;
5044         }
5045         ctx->compat = in_compat_syscall();
5046         ctx->account_mem = account_mem;
5047         ctx->user = user;
5048         ctx->creds = get_current_cred();
5049
5050         ret = io_allocate_scq_urings(ctx, p);
5051         if (ret)
5052                 goto err;
5053
5054         ret = io_sq_offload_start(ctx, p);
5055         if (ret)
5056                 goto err;
5057
5058         memset(&p->sq_off, 0, sizeof(p->sq_off));
5059         p->sq_off.head = offsetof(struct io_rings, sq.head);
5060         p->sq_off.tail = offsetof(struct io_rings, sq.tail);
5061         p->sq_off.ring_mask = offsetof(struct io_rings, sq_ring_mask);
5062         p->sq_off.ring_entries = offsetof(struct io_rings, sq_ring_entries);
5063         p->sq_off.flags = offsetof(struct io_rings, sq_flags);
5064         p->sq_off.dropped = offsetof(struct io_rings, sq_dropped);
5065         p->sq_off.array = (char *)ctx->sq_array - (char *)ctx->rings;
5066
5067         memset(&p->cq_off, 0, sizeof(p->cq_off));
5068         p->cq_off.head = offsetof(struct io_rings, cq.head);
5069         p->cq_off.tail = offsetof(struct io_rings, cq.tail);
5070         p->cq_off.ring_mask = offsetof(struct io_rings, cq_ring_mask);
5071         p->cq_off.ring_entries = offsetof(struct io_rings, cq_ring_entries);
5072         p->cq_off.overflow = offsetof(struct io_rings, cq_overflow);
5073         p->cq_off.cqes = offsetof(struct io_rings, cqes);
5074
5075         /*
5076          * Install ring fd as the very last thing, so we don't risk someone
5077          * having closed it before we finish setup
5078          */
5079         ret = io_uring_get_fd(ctx);
5080         if (ret < 0)
5081                 goto err;
5082
5083         p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP |
5084                         IORING_FEAT_SUBMIT_STABLE;
5085         trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
5086         return ret;
5087 err:
5088         io_ring_ctx_wait_and_kill(ctx);
5089         return ret;
5090 }
5091
5092 /*
5093  * Sets up an aio uring context, and returns the fd. Applications asks for a
5094  * ring size, we return the actual sq/cq ring sizes (among other things) in the
5095  * params structure passed in.
5096  */
5097 static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
5098 {
5099         struct io_uring_params p;
5100         long ret;
5101         int i;
5102
5103         if (copy_from_user(&p, params, sizeof(p)))
5104                 return -EFAULT;
5105         for (i = 0; i < ARRAY_SIZE(p.resv); i++) {
5106                 if (p.resv[i])
5107                         return -EINVAL;
5108         }
5109
5110         if (p.flags & ~(IORING_SETUP_IOPOLL | IORING_SETUP_SQPOLL |
5111                         IORING_SETUP_SQ_AFF | IORING_SETUP_CQSIZE))
5112                 return -EINVAL;
5113
5114         ret = io_uring_create(entries, &p);
5115         if (ret < 0)
5116                 return ret;
5117
5118         if (copy_to_user(params, &p, sizeof(p)))
5119                 return -EFAULT;
5120
5121         return ret;
5122 }
5123
5124 SYSCALL_DEFINE2(io_uring_setup, u32, entries,
5125                 struct io_uring_params __user *, params)
5126 {
5127         return io_uring_setup(entries, params);
5128 }
5129
5130 static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
5131                                void __user *arg, unsigned nr_args)
5132         __releases(ctx->uring_lock)
5133         __acquires(ctx->uring_lock)
5134 {
5135         int ret;
5136
5137         /*
5138          * We're inside the ring mutex, if the ref is already dying, then
5139          * someone else killed the ctx or is already going through
5140          * io_uring_register().
5141          */
5142         if (percpu_ref_is_dying(&ctx->refs))
5143                 return -ENXIO;
5144
5145         percpu_ref_kill(&ctx->refs);
5146
5147         /*
5148          * Drop uring mutex before waiting for references to exit. If another
5149          * thread is currently inside io_uring_enter() it might need to grab
5150          * the uring_lock to make progress. If we hold it here across the drain
5151          * wait, then we can deadlock. It's safe to drop the mutex here, since
5152          * no new references will come in after we've killed the percpu ref.
5153          */
5154         mutex_unlock(&ctx->uring_lock);
5155         wait_for_completion(&ctx->completions[0]);
5156         mutex_lock(&ctx->uring_lock);
5157
5158         switch (opcode) {
5159         case IORING_REGISTER_BUFFERS:
5160                 ret = io_sqe_buffer_register(ctx, arg, nr_args);
5161                 break;
5162         case IORING_UNREGISTER_BUFFERS:
5163                 ret = -EINVAL;
5164                 if (arg || nr_args)
5165                         break;
5166                 ret = io_sqe_buffer_unregister(ctx);
5167                 break;
5168         case IORING_REGISTER_FILES:
5169                 ret = io_sqe_files_register(ctx, arg, nr_args);
5170                 break;
5171         case IORING_UNREGISTER_FILES:
5172                 ret = -EINVAL;
5173                 if (arg || nr_args)
5174                         break;
5175                 ret = io_sqe_files_unregister(ctx);
5176                 break;
5177         case IORING_REGISTER_FILES_UPDATE:
5178                 ret = io_sqe_files_update(ctx, arg, nr_args);
5179                 break;
5180         case IORING_REGISTER_EVENTFD:
5181                 ret = -EINVAL;
5182                 if (nr_args != 1)
5183                         break;
5184                 ret = io_eventfd_register(ctx, arg);
5185                 break;
5186         case IORING_UNREGISTER_EVENTFD:
5187                 ret = -EINVAL;
5188                 if (arg || nr_args)
5189                         break;
5190                 ret = io_eventfd_unregister(ctx);
5191                 break;
5192         default:
5193                 ret = -EINVAL;
5194                 break;
5195         }
5196
5197         /* bring the ctx back to life */
5198         reinit_completion(&ctx->completions[0]);
5199         percpu_ref_reinit(&ctx->refs);
5200         return ret;
5201 }
5202
5203 SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
5204                 void __user *, arg, unsigned int, nr_args)
5205 {
5206         struct io_ring_ctx *ctx;
5207         long ret = -EBADF;
5208         struct fd f;
5209
5210         f = fdget(fd);
5211         if (!f.file)
5212                 return -EBADF;
5213
5214         ret = -EOPNOTSUPP;
5215         if (f.file->f_op != &io_uring_fops)
5216                 goto out_fput;
5217
5218         ctx = f.file->private_data;
5219
5220         mutex_lock(&ctx->uring_lock);
5221         ret = __io_uring_register(ctx, opcode, arg, nr_args);
5222         mutex_unlock(&ctx->uring_lock);
5223         trace_io_uring_register(ctx, opcode, ctx->nr_user_files, ctx->nr_user_bufs,
5224                                                         ctx->cq_ev_fd != NULL, ret);
5225 out_fput:
5226         fdput(f);
5227         return ret;
5228 }
5229
5230 static int __init io_uring_init(void)
5231 {
5232         req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC);
5233         return 0;
5234 };
5235 __initcall(io_uring_init);