#include "internal.h"
-#define IORING_MAX_ENTRIES 4096
+#define IORING_MAX_ENTRIES 32768
#define IORING_MAX_FIXED_FILES 1024
struct io_uring {
struct list_head list;
struct file *file;
- off_t io_end;
+ off_t io_start;
size_t io_len;
};
} ____cacheline_aligned_in_smp;
/* IO offload */
- struct workqueue_struct *sqo_wq;
+ struct workqueue_struct *sqo_wq[2];
struct task_struct *sqo_thread; /* if using sq thread polling */
struct mm_struct *sqo_mm;
wait_queue_head_t sqo_wait;
}
}
+static inline void io_queue_async_work(struct io_ring_ctx *ctx,
+ struct io_kiocb *req)
+{
+ int rw;
+
+ switch (req->submit.sqe->opcode) {
+ case IORING_OP_WRITEV:
+ case IORING_OP_WRITE_FIXED:
+ rw = !(req->rw.ki_flags & IOCB_DIRECT);
+ break;
+ default:
+ rw = 0;
+ break;
+ }
+
+ queue_work(ctx->sqo_wq[rw], &req->work);
+}
+
static void io_commit_cqring(struct io_ring_ctx *ctx)
{
struct io_kiocb *req;
continue;
}
req->flags |= REQ_F_IO_DRAINED;
- queue_work(ctx->sqo_wq, &req->work);
+ io_queue_async_work(ctx, req);
}
}
nxt->flags |= REQ_F_LINK_DONE;
INIT_WORK(&nxt->work, io_sq_wq_submit_work);
- queue_work(req->ctx->sqo_wq, &nxt->work);
+ io_queue_async_work(req->ctx, nxt);
}
}
return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
}
+static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
+{
+ if (al->file == kiocb->ki_filp) {
+ off_t start, end;
+
+ /*
+ * Allow merging if we're anywhere in the range of the same
+ * page. Generally this happens for sub-page reads or writes,
+ * and it's beneficial to allow the first worker to bring the
+ * page in and the piggy backed work can then work on the
+ * cached page.
+ */
+ start = al->io_start & PAGE_MASK;
+ end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
+ if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
+ return true;
+ }
+
+ al->file = NULL;
+ return false;
+}
+
/*
* Make a note of the last file/offset/direction we punted to async
* context. We'll use this information to see if we can piggy back a
struct async_list *async_list = &req->ctx->pending_async[rw];
struct kiocb *kiocb = &req->rw;
struct file *filp = kiocb->ki_filp;
- off_t io_end = kiocb->ki_pos + len;
- if (filp == async_list->file && kiocb->ki_pos == async_list->io_end) {
+ if (io_should_merge(async_list, kiocb)) {
unsigned long max_bytes;
/* Use 8x RA size as a decent limiter for both reads/writes */
req->flags |= REQ_F_SEQ_PREV;
async_list->io_len += len;
} else {
- io_end = 0;
- async_list->io_len = 0;
+ async_list->file = NULL;
}
}
/* New file? Reset state. */
if (async_list->file != filp) {
- async_list->io_len = 0;
+ async_list->io_start = kiocb->ki_pos;
+ async_list->io_len = len;
async_list->file = filp;
}
- async_list->io_end = io_end;
}
static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
WRITE_ONCE(poll->canceled, true);
if (!list_empty(&poll->wait.entry)) {
list_del_init(&poll->wait.entry);
- queue_work(req->ctx->sqo_wq, &req->work);
+ io_queue_async_work(req->ctx, req);
}
spin_unlock(&poll->head->lock);
io_cqring_ev_posted(ctx);
io_put_req(req);
} else {
- queue_work(ctx->sqo_wq, &req->work);
+ io_queue_async_work(ctx, req);
}
return 1;
*/
static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
{
- bool ret = false;
+ bool ret;
if (!list)
return false;
}
static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
- struct sqe_submit *s)
+ struct sqe_submit *s, bool force_nonblock)
{
int ret;
- ret = __io_submit_sqe(ctx, req, s, true);
+ ret = __io_submit_sqe(ctx, req, s, force_nonblock);
if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
struct io_uring_sqe *sqe_copy;
if (list)
atomic_inc(&list->cnt);
INIT_WORK(&req->work, io_sq_wq_submit_work);
- queue_work(ctx->sqo_wq, &req->work);
+ io_queue_async_work(ctx, req);
}
/*
}
static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
- struct sqe_submit *s)
+ struct sqe_submit *s, bool force_nonblock)
{
int ret;
return 0;
}
- return __io_queue_sqe(ctx, req, s);
+ return __io_queue_sqe(ctx, req, s, force_nonblock);
}
static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
- struct sqe_submit *s, struct io_kiocb *shadow)
+ struct sqe_submit *s, struct io_kiocb *shadow,
+ bool force_nonblock)
{
int ret;
int need_submit = false;
if (!shadow)
- return io_queue_sqe(ctx, req, s);
+ return io_queue_sqe(ctx, req, s, force_nonblock);
/*
* Mark the first IO in link list as DRAIN, let all the following
spin_unlock_irq(&ctx->completion_lock);
if (need_submit)
- return __io_queue_sqe(ctx, req, s);
+ return __io_queue_sqe(ctx, req, s, force_nonblock);
return 0;
}
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
- struct io_submit_state *state, struct io_kiocb **link)
+ struct io_submit_state *state, struct io_kiocb **link,
+ bool force_nonblock)
{
struct io_uring_sqe *sqe_copy;
struct io_kiocb *req;
INIT_LIST_HEAD(&req->link_list);
*link = req;
} else {
- io_queue_sqe(ctx, req, s);
+ io_queue_sqe(ctx, req, s, force_nonblock);
}
}
* that's the end of the chain. Submit the previous link.
*/
if (!prev_was_link && link) {
- io_queue_link_head(ctx, link, &link->submit, shadow_req);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req,
+ true);
link = NULL;
}
prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
sqes[i].has_user = has_user;
sqes[i].needs_lock = true;
sqes[i].needs_fixed_file = true;
- io_submit_sqe(ctx, &sqes[i], statep, &link);
+ io_submit_sqe(ctx, &sqes[i], statep, &link, true);
submitted++;
}
}
if (link)
- io_queue_link_head(ctx, link, &link->submit, shadow_req);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
if (statep)
io_submit_state_end(&state);
return 0;
}
-static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
+static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
+ bool block_for_last)
{
struct io_submit_state state, *statep = NULL;
struct io_kiocb *link = NULL;
}
for (i = 0; i < to_submit; i++) {
+ bool force_nonblock = true;
struct sqe_submit s;
if (!io_get_sqring(ctx, &s))
* that's the end of the chain. Submit the previous link.
*/
if (!prev_was_link && link) {
- io_queue_link_head(ctx, link, &link->submit, shadow_req);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req,
+ force_nonblock);
link = NULL;
}
prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
s.needs_lock = false;
s.needs_fixed_file = false;
submit++;
- io_submit_sqe(ctx, &s, statep, &link);
+
+ /*
+ * The caller will block for events after submit, submit the
+ * last IO non-blocking. This is either the only IO it's
+ * submitting, or it already submitted the previous ones. This
+ * improves performance by avoiding an async punt that we don't
+ * need to do.
+ */
+ if (block_for_last && submit == to_submit)
+ force_nonblock = false;
+
+ io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
}
io_commit_sqring(ctx);
if (link)
- io_queue_link_head(ctx, link, &link->submit, shadow_req);
+ io_queue_link_head(ctx, link, &link->submit, shadow_req,
+ block_for_last);
if (statep)
io_submit_state_end(statep);
static void io_finish_async(struct io_ring_ctx *ctx)
{
+ int i;
+
io_sq_thread_stop(ctx);
- if (ctx->sqo_wq) {
- destroy_workqueue(ctx->sqo_wq);
- ctx->sqo_wq = NULL;
+ for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
+ if (ctx->sqo_wq[i]) {
+ destroy_workqueue(ctx->sqo_wq[i]);
+ ctx->sqo_wq[i] = NULL;
+ }
}
}
}
/* Do QD, or 2 * CPUS, whatever is smallest */
- ctx->sqo_wq = alloc_workqueue("io_ring-wq", WQ_UNBOUND | WQ_FREEZABLE,
+ ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
+ WQ_UNBOUND | WQ_FREEZABLE,
min(ctx->sq_entries - 1, 2 * num_online_cpus()));
- if (!ctx->sqo_wq) {
+ if (!ctx->sqo_wq[0]) {
+ ret = -ENOMEM;
+ goto err;
+ }
+
+ /*
+ * This is for buffered writes, where we want to limit the parallelism
+ * due to file locking in file systems. As "normal" buffered writes
+ * should parellelize on writeout quite nicely, limit us to having 2
+ * pending. This avoids massive contention on the inode when doing
+ * buffered async writes.
+ */
+ ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
+ WQ_UNBOUND | WQ_FREEZABLE, 2);
+ if (!ctx->sqo_wq[1]) {
ret = -ENOMEM;
goto err;
}
return 0;
err:
- io_sq_thread_stop(ctx);
+ io_finish_async(ctx);
mmdrop(ctx->sqo_mm);
ctx->sqo_mm = NULL;
return ret;
* Just return the requested submit count, and wake the thread if
* we were asked to.
*/
+ ret = 0;
if (ctx->flags & IORING_SETUP_SQPOLL) {
if (flags & IORING_ENTER_SQ_WAKEUP)
wake_up(&ctx->sqo_wait);
submitted = to_submit;
- goto out_ctx;
- }
+ } else if (to_submit) {
+ bool block_for_last = false;
- ret = 0;
- if (to_submit) {
to_submit = min(to_submit, ctx->sq_entries);
+ /*
+ * Allow last submission to block in a series, IFF the caller
+ * asked to wait for events and we don't currently have
+ * enough. This potentially avoids an async punt.
+ */
+ if (to_submit == min_complete &&
+ io_cqring_events(ctx->rings) < min_complete)
+ block_for_last = true;
+
mutex_lock(&ctx->uring_lock);
- submitted = io_ring_submit(ctx, to_submit);
+ submitted = io_ring_submit(ctx, to_submit, block_for_last);
mutex_unlock(&ctx->uring_lock);
}
if (flags & IORING_ENTER_GETEVENTS) {
}
}
-out_ctx:
io_ring_drop_ctx_refs(ctx, 1);
out_fput:
fdput(f);