diff --git a/libc/af_local.cc b/libc/af_local.cc index c4b531f397e18736d1b79c97f5704cd8ca72279e..055fd3b414e1f6664404363e2294cb0d98265332 100644 --- a/libc/af_local.cc +++ b/libc/af_local.cc @@ -153,30 +153,47 @@ int af_local_buffer::write(uio* data) } int err = 0; with_lock(mtx, [&] { - int r; - // FIXME: In Posix, pipe write()s smaller than PIPE_BUF (=4096) are - // guaranteed to be atomic, i.e., if there's no place for the whole - // write in the buffer, we need to wait until there is - not just - // until there is place for at least one byte. // FIXME: Should support also non-blocking operation (O_NONBLOCK). - while ((r = write_events_unlocked()) == 0) { + // A write() smaller than PIPE_BUF (=4096 in Linux) will not be split + // (i.e., will be "atomic"): For such a small write, we need to wait + // until there's enough room for all it in the buffer. + int needroom = data->uio_resid <= 4096 ? data->uio_resid : 1; + while (receiver && q.size() + needroom > max_buf) { may_write.wait(&mtx); } - if (!(r & POLLOUT)) { - assert(r & POLLHUP); + + if (!receiver) { // FIXME: If we don't generate a SIGPIPE here, at least assert // that SIGPIPE is SIG_IGN (which can be our default); If the // user installed err = EPIPE; return; } - auto iov = data->uio_iov; - // FIXME: this loop does NOT correctly iterate the iovec. - while (data->uio_resid && (write_events_unlocked() & POLLOUT)) { - auto n = std::min(max_buf - q.size(), iov->iov_len); - char* p = static_cast<char*>(iov->iov_base); - std::copy(p, p + n, std::back_inserter(q)); - data->uio_resid -= n; + + // A blocking write() to a pipe never returns with partial success - + // it waits, possibly writing its output in parts and waiting multiple + // times, until the whole given buffer is written. + int iovoffset = 0; + while (data->uio_resid && receiver) { + // FIXME: this loop does NOT correctly iterate the iovec. + auto iov = data->uio_iov; + while (data->uio_resid && q.size() < max_buf) { + auto n = std::min(max_buf - q.size(), iov->iov_len-iovoffset); + char* p = static_cast<char*>(iov->iov_base) + iovoffset; + std::copy(p, p + n, std::back_inserter(q)); + data->uio_resid -= n; + iovoffset += n; + } + if (data->uio_resid) { + // The buffer is full but we still have more to send. Wake up + // readers, and go to sleep ourselves. + assert(q.size() == max_buf); + poll_wake(receiver, (POLLIN | POLLRDNORM)); + may_read.wake_all(); + while (receiver && q.size() == max_buf) { + may_write.wait(&mtx); + } + } } if (read_events_unlocked() & POLLIN) poll_wake(receiver, (POLLIN | POLLRDNORM));