Skip to content
Snippets Groups Projects
Commit 2a6a0391 authored by Nadav Har'El's avatar Nadav Har'El
Browse files

Atomic writes, and long writes, to pipes.

This patch fixes two behaviors of pipes and unix-domain stream socketpair,
which went against Posix and Linux standards

1. A blocking write() on a pipe needs to return only when the full write -
   is finished. It should not just write until the end of the pipe buffer
   and return - as we did in the previous code.

   This means that a long write() to a pipe can write the data in parts,
   waiting between them for a reader to read from the pipe.

2. As explained above, writes will be split into parts (and if there are
   multiple writers, get mixed with writes from other writers). But Posix
   also guarantees that short writes - up to 4096 bytes (PIPE_BUF==4096
   on Linux) - are *atomic*, and not be split up.
   In the previous code, if even 1 byte was available on the buffer,
   we wrote it. Now, if the write is short, we need to wait until the
   entire needed length is available.
parent f4ba833c
No related branches found
No related tags found
No related merge requests found
......@@ -153,30 +153,47 @@ int af_local_buffer::write(uio* data)
}
int err = 0;
with_lock(mtx, [&] {
int r;
// FIXME: In Posix, pipe write()s smaller than PIPE_BUF (=4096) are
// guaranteed to be atomic, i.e., if there's no place for the whole
// write in the buffer, we need to wait until there is - not just
// until there is place for at least one byte.
// FIXME: Should support also non-blocking operation (O_NONBLOCK).
while ((r = write_events_unlocked()) == 0) {
// A write() smaller than PIPE_BUF (=4096 in Linux) will not be split
// (i.e., will be "atomic"): For such a small write, we need to wait
// until there's enough room for all it in the buffer.
int needroom = data->uio_resid <= 4096 ? data->uio_resid : 1;
while (receiver && q.size() + needroom > max_buf) {
may_write.wait(&mtx);
}
if (!(r & POLLOUT)) {
assert(r & POLLHUP);
if (!receiver) {
// FIXME: If we don't generate a SIGPIPE here, at least assert
// that SIGPIPE is SIG_IGN (which can be our default); If the
// user installed
err = EPIPE;
return;
}
auto iov = data->uio_iov;
// FIXME: this loop does NOT correctly iterate the iovec.
while (data->uio_resid && (write_events_unlocked() & POLLOUT)) {
auto n = std::min(max_buf - q.size(), iov->iov_len);
char* p = static_cast<char*>(iov->iov_base);
std::copy(p, p + n, std::back_inserter(q));
data->uio_resid -= n;
// A blocking write() to a pipe never returns with partial success -
// it waits, possibly writing its output in parts and waiting multiple
// times, until the whole given buffer is written.
int iovoffset = 0;
while (data->uio_resid && receiver) {
// FIXME: this loop does NOT correctly iterate the iovec.
auto iov = data->uio_iov;
while (data->uio_resid && q.size() < max_buf) {
auto n = std::min(max_buf - q.size(), iov->iov_len-iovoffset);
char* p = static_cast<char*>(iov->iov_base) + iovoffset;
std::copy(p, p + n, std::back_inserter(q));
data->uio_resid -= n;
iovoffset += n;
}
if (data->uio_resid) {
// The buffer is full but we still have more to send. Wake up
// readers, and go to sleep ourselves.
assert(q.size() == max_buf);
poll_wake(receiver, (POLLIN | POLLRDNORM));
may_read.wake_all();
while (receiver && q.size() == max_buf) {
may_write.wait(&mtx);
}
}
}
if (read_events_unlocked() & POLLIN)
poll_wake(receiver, (POLLIN | POLLRDNORM));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment