From: Linus Torvalds Date: Sat, 7 Dec 2019 20:14:28 +0000 (-0800) Subject: pipe: fix and clarify pipe write wakeup logic X-Git-Tag: v5.5-rc1~18 X-Git-Url: https://www.infradead.org/git/?a=commitdiff_plain;h=1b6b26ae7053;p=users%2Fhch%2Fdma-mapping.git pipe: fix and clarify pipe write wakeup logic The pipe rework ends up having been extra painful, partly becaused of actual bugs with ordering and caching of the pipe state, but also because of subtle performance issues. In particular, the pipe rework caused the kernel build to inexplicably slow down. The reason turns out to be that the GNU make jobserver (which limits the parallelism of the build) uses a pipe to implement a "token" system: a parallel submake will read a character from the pipe to get the job token before starting a new job, and will write a character back to the pipe when it is done. The overall job limit is thus easily controlled by just writing the appropriate number of initial token characters into the pipe. But to work well, that really means that the old behavior of write wakeups being synchronous (WF_SYNC) is very important - when the pipe writer wakes up a reader, we want the reader to actually get scheduled immediately. Otherwise you lose the parallelism of the build. The pipe rework lost that synchronous wakeup on write, and we had clearly all forgotten the reasons and rules for it. This rewrites the pipe write wakeup logic to do the required Wsync wakeups, but also clarifies the logic and avoids extraneous wakeups. It also ends up addign a number of comments about what oit does and why, so that we hopefully don't end up forgetting about this next time we change this code. Cc: David Howells Signed-off-by: Linus Torvalds --- diff --git a/fs/pipe.c b/fs/pipe.c index 3e8b11e3b764..efe93384c61c 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -391,9 +391,9 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) struct pipe_inode_info *pipe = filp->private_data; unsigned int head; ssize_t ret = 0; - int do_wakeup = 0; size_t total_len = iov_iter_count(from); ssize_t chars; + bool was_empty = false; /* Null write succeeds. */ if (unlikely(total_len == 0)) @@ -407,11 +407,21 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) goto out; } + /* + * Only wake up if the pipe started out empty, since + * otherwise there should be no readers waiting. + * + * If it wasn't empty we try to merge new data into + * the last buffer. + * + * That naturally merges small writes, but it also + * page-aligs the rest of the writes for large writes + * spanning multiple pages. + */ head = pipe->head; - - /* We try to merge small writes */ - chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */ - if (!pipe_empty(head, pipe->tail) && chars != 0) { + was_empty = pipe_empty(head, pipe->tail); + chars = total_len & (PAGE_SIZE-1); + if (chars && !was_empty) { unsigned int mask = pipe->ring_size - 1; struct pipe_buffer *buf = &pipe->bufs[(head - 1) & mask]; int offset = buf->offset + buf->len; @@ -426,7 +436,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) ret = -EFAULT; goto out; } - do_wakeup = 1; + buf->len += ret; if (!iov_iter_count(from)) goto out; @@ -471,17 +481,7 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) } pipe->head = head + 1; - - /* Always wake up, even if the copy fails. Otherwise - * we lock up (O_NONBLOCK-)readers that sleep due to - * syscall merging. - * FIXME! Is this really true? - */ - wake_up_locked_poll( - &pipe->wait, EPOLLIN | EPOLLRDNORM); - spin_unlock_irq(&pipe->wait.lock); - kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); /* Insert it into the buffer array */ buf = &pipe->bufs[head & mask]; @@ -524,14 +524,37 @@ pipe_write(struct kiocb *iocb, struct iov_iter *from) ret = -ERESTARTSYS; break; } + + /* + * We're going to release the pipe lock and wait for more + * space. We wake up any readers if necessary, and then + * after waiting we need to re-check whether the pipe + * become empty while we dropped the lock. + */ + if (was_empty) { + wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM); + kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); + } pipe->waiting_writers++; pipe_wait(pipe); pipe->waiting_writers--; + + was_empty = pipe_empty(head, pipe->tail); } out: __pipe_unlock(pipe); - if (do_wakeup) { - wake_up_interruptible_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM); + + /* + * If we do do a wakeup event, we do a 'sync' wakeup, because we + * want the reader to start processing things asap, rather than + * leave the data pending. + * + * This is particularly important for small writes, because of + * how (for example) the GNU make jobserver uses small writes to + * wake up pending jobs + */ + if (was_empty) { + wake_up_interruptible_sync_poll(&pipe->wait, EPOLLIN | EPOLLRDNORM); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); } if (ret > 0 && sb_start_write_trylock(file_inode(filp)->i_sb)) {