From cf5a1bc4f13ea3f066d2b1cf34ab68feb1a744b9 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Fri, 17 Feb 2017 22:54:44 -0800 Subject: [PATCH] Rewrite `Bytes` / `BytesMut` core implementation The previous implementation didn't factor in a single `Bytes` handle being stored in an `Arc`. This new implementation correctly impelments both `Bytes` and `BytesMut` such that both are `Sync`. The rewrite also increases the number of bytes that can be stored inline. --- .travis.yml | 62 ++- Cargo.toml | 3 + benches/vs_easy_buf.rs | 210 ++++++++ ci/before_deploy.ps1 | 23 + ci/before_deploy.sh | 33 ++ ci/install.sh | 31 ++ ci/script.sh | 18 + src/bytes.rs | 1120 +++++++++++++++++++++++++--------------- tests/test_bytes.rs | 134 +++-- 9 files changed, 1159 insertions(+), 475 deletions(-) create mode 100644 benches/vs_easy_buf.rs create mode 100644 ci/before_deploy.ps1 create mode 100644 ci/before_deploy.sh create mode 100644 ci/install.sh create mode 100644 ci/script.sh diff --git a/.travis.yml b/.travis.yml index 761a55f..66a92ea 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,30 +1,56 @@ --- +dist: trusty language: rust -rust: - - nightly - - stable - # Oldest supported Rust version. Do not change this without a Github issue - # discussion. - - 1.10.0 +services: docker +sudo: required +rust: stable + +env: + global: + - CRATE_NAME=bytes + # Default job + - TARGET=x86_64-unknown-linux-gnu + - secure: "f17G5kb6uAQlAG9+GknFFYAmngGBqy9h+3FtNbp3mXTI0FOLltz00Ul5kGPysE4eagypm/dWOuvBkNjN01jhE6fCbekmInEsobIuanatrk6TvXT6caJqykxhPJC2cUoq8pKnMqEOuucEqPPUH6Qy6Hz4/2cRu5JV22Uv9dtS29Q=" matrix: - allow_failures: - - rust: nightly + include: + # Run build on oldest supported rust version. Do not change the rust + # version without a Github issue first. + # + # This job will also build and deploy the docs to gh-pages. + - env: TARGET=x86_64-unknown-linux-gnu + rust: 1.10.0 + after_success: + - | + pip install 'travis-cargo<0.2' --user && + export PATH=$HOME/.local/bin:$PATH + - travis-cargo --only stable doc + - travis-cargo --only stable doc-upload + + # Run tests on some extra platforms + - env: TARGET=i686-unknown-linux-gnu + - env: TARGET=armv7-unknown-linux-gnueabihf + - env: TARGET=powerpc-unknown-linux-gnu + - env: TARGET=powerpc64-unknown-linux-gnu -before_script: - - pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH +before_install: set -e + +install: + - sh ci/install.sh + - source ~/.cargo/env || true script: - - cargo build - - RUST_BACKTRACE=1 cargo test - - cargo doc --no-deps + - bash ci/script.sh -after_success: - - travis-cargo --only stable doc-upload +after_script: set +e -env: - global: - secure: "f17G5kb6uAQlAG9+GknFFYAmngGBqy9h+3FtNbp3mXTI0FOLltz00Ul5kGPysE4eagypm/dWOuvBkNjN01jhE6fCbekmInEsobIuanatrk6TvXT6caJqykxhPJC2cUoq8pKnMqEOuucEqPPUH6Qy6Hz4/2cRu5JV22Uv9dtS29Q=" +before_deploy: + - sh ci/before_deploy.sh + +cache: cargo +before_cache: + # Travis can't cache files that are not readable by "others" + - chmod -R a+r $HOME/.cargo notifications: email: diff --git a/Cargo.toml b/Cargo.toml index 402387e..b444cdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,6 @@ exclude = [ [dependencies] byteorder = "1.0.0" + +[dev-dependencies] +tokio-core = "0.1.0" diff --git a/benches/vs_easy_buf.rs b/benches/vs_easy_buf.rs new file mode 100644 index 0000000..063218a --- /dev/null +++ b/benches/vs_easy_buf.rs @@ -0,0 +1,210 @@ +#![feature(test)] + +extern crate tokio_core; +extern crate bytes; +extern crate test; + +mod bench_easy_buf { + use test::{self, Bencher}; + use tokio_core::io::EasyBuf; + + #[bench] + fn alloc_small(b: &mut Bencher) { + b.iter(|| { + for _ in 0..1024 { + test::black_box(EasyBuf::with_capacity(12)); + } + }) + } + + #[bench] + fn alloc_mid(b: &mut Bencher) { + b.iter(|| { + test::black_box(EasyBuf::with_capacity(128)); + }) + } + + #[bench] + fn alloc_big(b: &mut Bencher) { + b.iter(|| { + test::black_box(EasyBuf::with_capacity(4096)); + }) + } + + #[bench] + fn deref_front(b: &mut Bencher) { + let mut buf = EasyBuf::with_capacity(4096); + buf.get_mut().extend_from_slice(&[0; 1024][..]); + + b.iter(|| { + for _ in 0..1024 { + test::black_box(buf.as_slice()); + } + }) + } + + #[bench] + fn deref_mid(b: &mut Bencher) { + let mut buf = EasyBuf::with_capacity(4096); + buf.get_mut().extend_from_slice(&[0; 1024][..]); + let _a = buf.drain_to(512); + + b.iter(|| { + for _ in 0..1024 { + test::black_box(buf.as_slice()); + } + }) + } + + #[bench] + fn alloc_write_drain_to_mid(b: &mut Bencher) { + b.iter(|| { + let mut buf = EasyBuf::with_capacity(128); + buf.get_mut().extend_from_slice(&[0u8; 64]); + test::black_box(buf.drain_to(64)); + }) + } + + #[bench] + fn drain_write_drain(b: &mut Bencher) { + let data = [0u8; 128]; + + b.iter(|| { + let mut buf = EasyBuf::with_capacity(1024); + let mut parts = Vec::with_capacity(8); + + for _ in 0..8 { + buf.get_mut().extend_from_slice(&data[..]); + parts.push(buf.drain_to(128)); + } + + test::black_box(parts); + }) + } +} + +mod bench_bytes { + use test::{self, Bencher}; + use bytes::{BytesMut, BufMut}; + + #[bench] + fn alloc_small(b: &mut Bencher) { + b.iter(|| { + for _ in 0..1024 { + test::black_box(BytesMut::with_capacity(12)); + } + }) + } + + #[bench] + fn alloc_mid(b: &mut Bencher) { + b.iter(|| { + test::black_box(BytesMut::with_capacity(128)); + }) + } + + #[bench] + fn alloc_big(b: &mut Bencher) { + b.iter(|| { + test::black_box(BytesMut::with_capacity(4096)); + }) + } + + #[bench] + fn deref_unique(b: &mut Bencher) { + let mut buf = BytesMut::with_capacity(4096); + buf.put(&[0u8; 1024][..]); + + b.iter(|| { + for _ in 0..1024 { + test::black_box(&buf[..]); + } + }) + } + + #[bench] + fn deref_unique_unroll(b: &mut Bencher) { + let mut buf = BytesMut::with_capacity(4096); + buf.put(&[0u8; 1024][..]); + + b.iter(|| { + for _ in 0..128 { + test::black_box(&buf[..]); + test::black_box(&buf[..]); + test::black_box(&buf[..]); + test::black_box(&buf[..]); + test::black_box(&buf[..]); + test::black_box(&buf[..]); + test::black_box(&buf[..]); + test::black_box(&buf[..]); + } + }) + } + + #[bench] + fn deref_shared(b: &mut Bencher) { + let mut buf = BytesMut::with_capacity(4096); + buf.put(&[0u8; 1024][..]); + let _b2 = buf.split_off(1024); + + b.iter(|| { + for _ in 0..1024 { + test::black_box(&buf[..]); + } + }) + } + + #[bench] + fn deref_inline(b: &mut Bencher) { + let mut buf = BytesMut::with_capacity(8); + buf.put(&[0u8; 8][..]); + + b.iter(|| { + for _ in 0..1024 { + test::black_box(&buf[..]); + } + }) + } + + #[bench] + fn deref_two(b: &mut Bencher) { + let mut buf1 = BytesMut::with_capacity(8); + buf1.put(&[0u8; 8][..]); + + let mut buf2 = BytesMut::with_capacity(4096); + buf2.put(&[0u8; 1024][..]); + + b.iter(|| { + for _ in 0..512 { + test::black_box(&buf1[..]); + test::black_box(&buf2[..]); + } + }) + } + + #[bench] + fn alloc_write_drain_to_mid(b: &mut Bencher) { + b.iter(|| { + let mut buf = BytesMut::with_capacity(128); + buf.put_slice(&[0u8; 64]); + test::black_box(buf.drain_to(64)); + }) + } + + #[bench] + fn drain_write_drain(b: &mut Bencher) { + let data = [0u8; 128]; + + b.iter(|| { + let mut buf = BytesMut::with_capacity(1024); + let mut parts = Vec::with_capacity(8); + + for _ in 0..8 { + buf.put(&data[..]); + parts.push(buf.drain_to(128)); + } + + test::black_box(parts); + }) + } +} diff --git a/ci/before_deploy.ps1 b/ci/before_deploy.ps1 new file mode 100644 index 0000000..191a30b --- /dev/null +++ b/ci/before_deploy.ps1 @@ -0,0 +1,23 @@ +# This script takes care of packaging the build artifacts that will go in the +# release zipfile + +$SRC_DIR = $PWD.Path +$STAGE = [System.Guid]::NewGuid().ToString() + +Set-Location $ENV:Temp +New-Item -Type Directory -Name $STAGE +Set-Location $STAGE + +$ZIP = "$SRC_DIR\$($Env:CRATE_NAME)-$($Env:APPVEYOR_REPO_TAG_NAME)-$($Env:TARGET).zip" + +# TODO Update this to package the right artifacts +Copy-Item "$SRC_DIR\target\$($Env:TARGET)\release\hello.exe" '.\' + +7z a "$ZIP" * + +Push-AppveyorArtifact "$ZIP" + +Remove-Item *.* -Force +Set-Location .. +Remove-Item $STAGE +Set-Location $SRC_DIR diff --git a/ci/before_deploy.sh b/ci/before_deploy.sh new file mode 100644 index 0000000..026dc28 --- /dev/null +++ b/ci/before_deploy.sh @@ -0,0 +1,33 @@ +# This script takes care of building your crate and packaging it for release + +set -ex + +main() { + local src=$(pwd) \ + stage= + + case $TRAVIS_OS_NAME in + linux) + stage=$(mktemp -d) + ;; + osx) + stage=$(mktemp -d -t tmp) + ;; + esac + + test -f Cargo.lock || cargo generate-lockfile + + # TODO Update this to build the artifacts that matter to you + cross rustc --bin hello --target $TARGET --release -- -C lto + + # TODO Update this to package the right artifacts + cp target/$TARGET/release/hello $stage/ + + cd $stage + tar czf $src/$CRATE_NAME-$TRAVIS_TAG-$TARGET.tar.gz * + cd $src + + rm -rf $stage +} + +main diff --git a/ci/install.sh b/ci/install.sh new file mode 100644 index 0000000..76bb734 --- /dev/null +++ b/ci/install.sh @@ -0,0 +1,31 @@ +set -ex + +main() { + curl https://sh.rustup.rs -sSf | \ + sh -s -- -y --default-toolchain $TRAVIS_RUST_VERSION + + local target= + if [ $TRAVIS_OS_NAME = linux ]; then + target=x86_64-unknown-linux-gnu + sort=sort + else + target=x86_64-apple-darwin + sort=gsort # for `sort --sort-version`, from brew's coreutils. + fi + + # This fetches latest stable release + local tag=$(git ls-remote --tags --refs --exit-code https://github.com/japaric/cross \ + | cut -d/ -f3 \ + | grep -E '^v[0-9.]+$' \ + | $sort --version-sort \ + | tail -n1) + echo cross version: $tag + curl -LSfs https://japaric.github.io/trust/install.sh | \ + sh -s -- \ + --force \ + --git japaric/cross \ + --tag $tag \ + --target $target +} + +main diff --git a/ci/script.sh b/ci/script.sh new file mode 100644 index 0000000..e61e155 --- /dev/null +++ b/ci/script.sh @@ -0,0 +1,18 @@ +# This script takes care of testing your crate + +set -ex + +main() { + cross build --target $TARGET + + if [ ! -z $DISABLE_TESTS ]; then + return + fi + + cross test --target $TARGET +} + +# we don't run the "test phase" when doing deploys +if [ -z $TRAVIS_TAG ]; then + main +fi diff --git a/src/bytes.rs b/src/bytes.rs index 5ff0a70..5d39295 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -1,10 +1,10 @@ use {IntoBuf, BufMut}; -use std::{cmp, fmt, mem, hash, ops, slice, ptr}; +use std::{cmp, fmt, mem, hash, ops, slice, ptr, usize}; use std::borrow::Borrow; -use std::cell::{Cell, UnsafeCell}; use std::io::Cursor; -use std::sync::Arc; +use std::sync::atomic::{self, AtomicUsize, AtomicPtr}; +use std::sync::atomic::Ordering::{Relaxed, Acquire, Release, AcqRel}; /// A reference counted contiguous slice of memory. /// @@ -20,7 +20,7 @@ use std::sync::Arc; /// ``` /// use bytes::Bytes; /// -/// let mem = Bytes::from(&b"Hello world"[..]); +/// let mut mem = Bytes::from(&b"Hello world"[..]); /// let a = mem.slice(0, 5); /// /// assert_eq!(&a[..], b"Hello"); @@ -96,10 +96,10 @@ use std::sync::Arc; /// the slice directly in the handle. In this case, a clone is no longer /// "shallow" and the data will be copied. /// -/// [1] Small enough: 24 bytes on 64 bit systems, 12 on 32 bit systems. +/// [1] Small enough: 31 bytes on 64 bit systems, 15 on 32 bit systems. /// pub struct Bytes { - inner: Inner, + inner: Inner2, } /// A unique reference to a continuous slice of memory. @@ -131,53 +131,215 @@ pub struct Bytes { /// assert_eq!(&b[..], b"hello"); /// ``` pub struct BytesMut { - inner: Inner + inner: Inner2, } +// Both `Bytes` and `BytesMut` are backed by `Inner` and functions are delegated +// to `Inner` functions. The `Bytes` and `BytesMut` shims ensure that functions +// that mutate the underlying buffer are only performed when the data range +// being mutated is only available via a single `BytesMut` handle. +// +// # Data storage modes +// +// The goal of `bytes` is to be as efficient as possible across a wide range of +// potential usage patterns. As such, `bytes` needs to be able to handle buffers +// that are never shared, shared on a single thread, and shared across many +// threads. `bytes` also needs to handle both tiny buffers as well as very large +// buffers. For example, [Cassandra](http://cassandra.apache.org) values have +// been known to be in the hundreds of megabyte, and HTTP header values can be a +// few characters in size. +// +// To achieve high performance in these various situations, `Bytes` and +// `BytesMut` use different strategies for storing the buffer depending on the +// usage pattern. +// +// ## Delayed `Arc` allocation +// +// When a `Bytes` or `BytesMut` is first created, there is only one outstanding +// handle referencing the buffer. Since sharing is not yet required, an `Arc`* is +// not used and the buffer is backed by a `Vec<u8>` directly. Using an +// `Arc<Vec<u8>>` requires two allocations, so if the buffer ends up never being +// shared, that allocation is avoided. +// +// When sharing does become necessary (`clone`, `drain_to`, `split_off`), that +// is when the buffer is promoted to being shareable. The `Vec<u8>` is moved +// into an `Arc` and both the original handle and the new handle use the same +// buffer via the `Arc`. +// +// * `Arc` is being used to signify an atomically reference counted cell. We +// don't use the `Arc` implementation provided by `std` and instead use our own. +// This ends up simplifying a number of the `unsafe` code snippets. +// +// ## Inlining small buffers +// +// The `Bytes` / `BytesMut` structs require 4 pointer sized fields. On 64 bit +// systems, this ends up being 32 bytes, which is actually a lot of storage for +// cases where `Bytes` is being used to represent small byte strings, such as +// HTTP header names and values. +// +// To avoid any allocation at all in these cases, `Bytes` will use the struct +// itself for storing the buffer, reserving 1 byte for meta data. This means +// that, on 64 bit systems, 31 byte buffers require no allocation at all. +// +// The byte used for metadata stores a 1 bit flag used to indicate that the +// buffer is stored inline as well as 7 bits for tracking the buffer length (the +// return value of `Bytes::len`). +// +// ## Static buffers +// +// `Bytes` can also represent a static buffer, which is created with +// `Bytes::from_static`. No copying or allocations are required for trackign +// static buffers. The pointer to the `&'static [u8]`, the length, and a flag +// tracking that the `Bytes` instance represents a static buffer is stored in +// the `Bytes` struct. +// +// # Struct layout +// +// Both `Bytes` and `BytesMut` are wrappers around `Inner`, which provides the +// data fields as well as all of the function implementations. +// +// The `Inner` struct is carefully laid out in order to support the +// functionality described above as well as being as small as possible. Size is +// important as growing the size of the `Bytes` struct from 32 bytes to 40 bytes +// added as much as 15% overhead in benchmarks using `Bytes` in an HTTP header +// map structure. +// +// The `Inner` struct contains the following fields: +// +// * `ptr: *mut u8` +// * `len: usize` +// * `cap: usize` +// * `arc: AtomicPtr<Shared>` +// +// ## `ptr: *mut u8` +// +// A pointer to start of the handle's buffer view. When backed by a `Vec<u8>`, +// this is always the `Vec`'s pointer. When backed by an `Arc<Vec<u8>>`, `ptr` +// may have been shifted to point somewhere inside the buffer. +// +// When in "inlined" mode, `ptr` is used as part of the inlined buffer. +// +// ## `len: usize` +// +// The length of the handle's buffer view. When backed by a `Vec<u8>`, this is +// always the `Vec`'s length. The slice represented by `ptr` and `len` should +// (ideally) always be initialized memory. +// +// When in "inlined" mode, `len` is used as part of the inlined buffer. +// +// ## `cap: usize` +// +// The capacity of the handle's buffer view. When backed by a `Vec<u8>`, this is +// always the `Vec`'s capacity. The slice represented by `ptr+len` and `cap-len` +// may or may not be initialized memory. +// +// When in "inlined" mode, `cap` is used as part of the inlined buffer. +// +// ## `arc: AtomicPtr<Shared>` +// +// When `Inner` is in allocated mode (backed by Vec<u8> or Arc<Vec<u8>>), this +// will be the pointer to the `Arc` structure tracking the ref count for the +// underlying buffer. When the pointer is null, then the `Arc` has not been +// allocated yet and `self` is the only outstanding handle for the underlying +// buffer. +// +// The lower two bits of `arc` are used as flags to track the storage state of +// `Inner`. `0b01` indicates inline storage and `0b10` indicates static storage. +// Since pointers to allocated structures are aligned, the lower two bits of a +// pointer will always be 0. This allows disambiguating between a pointer and +// the two flags. +// +// When in "inlined" mode, the least significant byte of `arc` is also used to +// store the length of the buffer view (vs. the capacity, which is a constant). +// +// The rest of `arc`'s bytes are used as part of the inline buffer, which means +// that those bytes need to be located next to the `ptr`, `len`, and `cap` +// fields, which make up the rest of the inline buffer. This requires special +// casing the layout of `Inner` depending on if the target platform is bit or +// little endian. +// +// On little endian platforms, the `arc` field must be the first field in the +// struct. On big endian platforms, the `arc` field must be the last field in +// the struct. Since a deterministic struct layout is required, `Inner` is +// annotated with `#[repr(C)]`. +// +// # Thread safety +// +// `Bytes::clone()` returns a new `Bytes` handle with no copying. This is done +// by bumping the buffer ref count and returning a new struct pointing to the +// same buffer. However, the `Arc` structure is lazily allocated. This means +// that if `Bytes` is stored itself in an `Arc` (`Arc<Bytes>`), the `clone` +// function can be called concurrently from multiple threads. This is why an +// `AtomicPtr` is used for the `arc` field vs. a `*const`. +// +// Care is taken to ensure that the need for synchronization is minimized. Most +// operations do not require any synchronization. +// +#[cfg(target_endian = "little")] +#[repr(C)] struct Inner { - data: UnsafeCell<Data>, - - // If this pointer is set, then the the BytesMut is backed by an Arc - arc: Cell<usize>, + arc: AtomicPtr<Shared>, + ptr: *mut u8, + len: usize, + cap: usize, } +#[cfg(target_endian = "big")] #[repr(C)] -#[derive(Eq, PartialEq, Clone, Copy)] -struct Data { - // Pointer to the start of the memory owned by this BytesMut +struct Inner { ptr: *mut u8, - - // Number of bytes that have been initialized len: usize, - - // Total number of bytes owned by this BytesMut cap: usize, + arc: AtomicPtr<Shared>, } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -enum Kind { - Vec, - Arc, - Inline, - Static, +// This struct is only here to make older versions of Rust happy. In older +// versions of `Rust`, `repr(C)` structs could not have drop functions. While +// this is no longer the case for newer rust versions, a number of major Rust +// libraries still support older versions of Rust for which it is the case. To +// get around this, `Inner` (the actual struct) is wrapped by `Inner2` which has +// the drop fn implementation. +struct Inner2 { + inner: Inner, } -type Shared = Arc<UnsafeCell<Vec<u8>>>; +// Thread-safe reference-counted container for the shared storage. This mostly +// the same as `std::sync::Arc` but without the weak counter. The ref counting +// fns are based on the ones found in `std`. +// +// The main reason to use `Shared` instead of `std::sync::Arc` is that it ends +// up making the overall code simpler and easier to reason about. This is due to +// some of the logic around setting `Inner::arc` and other ways the `arc` field +// is used. Using `Arc` ended up requiring a number of funky transmutes and +// other shenanigans to make it work. +struct Shared { + vec: Vec<u8>, + ref_count: AtomicUsize, +} +// Buffer storage strategy flags. +const KIND_INLINE: usize = 0b01; +const KIND_STATIC: usize = 0b10; + +// Bit op constants for extracting the inline length value from the `arc` field. +const INLINE_LEN_MASK: usize = 0b11111110; +const INLINE_LEN_OFFSET: usize = 1; + +// Byte offset from the start of `Inner` to where the inline buffer data +// starts. On little endian platforms, the first byte of the struct is the +// storage flag, so the data is shifted by a byte. On big endian systems, the +// data starts at the beginning of the struct. +#[cfg(target_endian = "little")] +const INLINE_DATA_OFFSET: isize = 1; +#[cfg(target_endian = "big")] +const INLINE_DATA_OFFSET: isize = 0; + +// Inline buffer capacity. This is the size of `Inner` minus 1 byte for the +// metadata. #[cfg(target_pointer_width = "64")] -const INLINE_CAP: usize = 8 * 3; - +const INLINE_CAP: usize = 4 * 8 - 1; #[cfg(target_pointer_width = "32")] -const INNER_CAP: usize = 4 * 3; - -const KIND_MASK: usize = 3; -const KIND_INLINE: usize = 1; -const KIND_STATIC: usize = 2; - -const INLINE_START_OFFSET: usize = 16; -const INLINE_START_MASK: usize = 0xff << INLINE_START_OFFSET; -const INLINE_LEN_OFFSET: usize = 8; -const INLINE_LEN_MASK: usize = 0xff << INLINE_LEN_OFFSET; +const INLINE_CAP: usize = 4 * 4 - 1; /* * @@ -201,13 +363,13 @@ impl Bytes { #[inline] pub fn new() -> Bytes { Bytes { - inner: Inner { - data: UnsafeCell::new(Data { + inner: Inner2 { + inner: Inner { + arc: AtomicPtr::new(ptr::null_mut()), ptr: ptr::null_mut(), len: 0, cap: 0, - }), - arc: Cell::new(0), + } } } } @@ -227,14 +389,19 @@ impl Bytes { /// ``` #[inline] pub fn from_static(bytes: &'static [u8]) -> Bytes { + let ptr = bytes.as_ptr() as *mut u8; + Bytes { - inner: Inner { - data: UnsafeCell::new(Data { - ptr: bytes.as_ptr() as *mut u8, + inner: Inner2 { + inner: Inner { + // `arc` won't ever store a pointer. Instead, use it to + // track the fact that the `Bytes` handle is backed by a + // static buffer. + arc: AtomicPtr::new(KIND_STATIC as *mut Shared), + ptr: ptr, len: bytes.len(), cap: bytes.len(), - }), - arc: Cell::new(KIND_STATIC), + } } } } @@ -290,7 +457,7 @@ impl Bytes { /// Requires that `begin <= end` and `end <= self.len()`, otherwise slicing /// will panic. pub fn slice(&self, begin: usize, end: usize) -> Bytes { - let ret = self.clone(); + let mut ret = self.clone(); unsafe { ret.inner.set_end(end); @@ -364,7 +531,7 @@ impl Bytes { /// ``` /// use bytes::Bytes; /// - /// let a = Bytes::from(&b"hello world"[..]); + /// let mut a = Bytes::from(&b"hello world"[..]); /// let b = a.split_off(5); /// /// assert_eq!(&a[..], b"hello"); @@ -374,8 +541,12 @@ impl Bytes { /// # Panics /// /// Panics if `at > len` - pub fn split_off(&self, at: usize) -> Bytes { - Bytes { inner: self.inner.split_off(at) } + pub fn split_off(&mut self, at: usize) -> Bytes { + Bytes { + inner: Inner2 { + inner: self.inner.split_off(at), + } + } } /// Splits the bytes into two at the given index. @@ -391,7 +562,7 @@ impl Bytes { /// ``` /// use bytes::Bytes; /// - /// let a = Bytes::from(&b"hello world"[..]); + /// let mut a = Bytes::from(&b"hello world"[..]); /// let b = a.drain_to(5); /// /// assert_eq!(&a[..], b" world"); @@ -401,8 +572,12 @@ impl Bytes { /// # Panics /// /// Panics if `at > len` - pub fn drain_to(&self, at: usize) -> Bytes { - Bytes { inner: self.inner.drain_to(at) } + pub fn drain_to(&mut self, at: usize) -> Bytes { + Bytes { + inner: Inner2 { + inner: self.inner.drain_to(at), + } + } } /// Attempt to convert into a `BytesMut` handle. @@ -460,7 +635,11 @@ impl<'a> IntoBuf for &'a Bytes { impl Clone for Bytes { fn clone(&self) -> Bytes { - Bytes { inner: self.inner.shallow_clone() } + Bytes { + inner: Inner2 { + inner: self.inner.shallow_clone(), + } + } } } @@ -536,8 +715,6 @@ impl Borrow<[u8]> for Bytes { } } -unsafe impl Sync for Bytes {} - /* * * ===== BytesMut ===== @@ -571,14 +748,15 @@ impl BytesMut { #[inline] pub fn with_capacity(capacity: usize) -> BytesMut { if capacity <= INLINE_CAP { - BytesMut { - inner: Inner { - data: UnsafeCell::new(Data { - ptr: ptr::null_mut(), - len: 0, - cap: 0, - }), - arc: Cell::new(KIND_INLINE), + unsafe { + // Using uninitialized memory is ~30% faster + BytesMut { + inner: Inner2 { + inner: Inner { + arc: AtomicPtr::new(KIND_INLINE as *mut Shared), + .. mem::uninitialized() + }, + }, } } } else { @@ -660,35 +838,6 @@ impl BytesMut { Bytes { inner: self.inner } } - /// Splits the bytes into two at the given index. - /// - /// Afterwards `self` contains elements `[0, at)`, and the returned - /// `BytesMut` contains elements `[at, capacity)`. - /// - /// This is an O(1) operation that just increases the reference count and - /// sets a few indexes. - /// - /// # Examples - /// - /// ``` - /// use bytes::BytesMut; - /// - /// let mut a = BytesMut::from(&b"hello world"[..]); - /// let b = a.split_off(5); - /// - /// a[0] = b'j'; - /// - /// assert_eq!(&a[..], b"jello"); - /// assert_eq!(&b[..], b" world"); - /// ``` - /// - /// # Panics - /// - /// Panics if `at > capacity` - pub fn split_off(&self, at: usize) -> Bytes { - Bytes { inner: self.inner.split_off(at) } - } - /// Splits the bytes into two at the given index. /// /// Afterwards `self` contains elements `[0, at)`, and the returned @@ -703,7 +852,7 @@ impl BytesMut { /// use bytes::BytesMut; /// /// let mut a = BytesMut::from(&b"hello world"[..]); - /// let mut b = a.split_off_mut(5); + /// let mut b = a.split_off(5); /// /// a[0] = b'j'; /// b[0] = b'!'; @@ -715,37 +864,12 @@ impl BytesMut { /// # Panics /// /// Panics if `at > capacity` - pub fn split_off_mut(&mut self, at: usize) -> BytesMut { - BytesMut { inner: self.inner.split_off(at) } - } - - /// Splits the buffer into two at the given index. - /// - /// Afterwards `self` contains elements `[at, len)`, and the returned `Bytes` - /// contains elements `[0, at)`. - /// - /// This is an O(1) operation that just increases the reference count and - /// sets a few indexes. - /// - /// # Examples - /// - /// ``` - /// use bytes::BytesMut; - /// - /// let mut a = BytesMut::from(&b"hello world"[..]); - /// let b = a.drain_to(5); - /// - /// a[0] = b'!'; - /// - /// assert_eq!(&a[..], b"!world"); - /// assert_eq!(&b[..], b"hello"); - /// ``` - /// - /// # Panics - /// - /// Panics if `at > len` - pub fn drain_to(&self, at: usize) -> Bytes { - Bytes { inner: self.inner.drain_to(at) } + pub fn split_off(&mut self, at: usize) -> BytesMut { + BytesMut { + inner: Inner2 { + inner: self.inner.split_off(at), + } + } } /// Splits the buffer into two at the given index. @@ -762,7 +886,7 @@ impl BytesMut { /// use bytes::BytesMut; /// /// let mut a = BytesMut::from(&b"hello world"[..]); - /// let mut b = a.drain_to_mut(5); + /// let mut b = a.drain_to(5); /// /// a[0] = b'!'; /// b[0] = b'j'; @@ -774,8 +898,12 @@ impl BytesMut { /// # Panics /// /// Panics if `at > len` - pub fn drain_to_mut(&mut self, at: usize) -> BytesMut { - BytesMut { inner: self.inner.drain_to(at) } + pub fn drain_to(&mut self, at: usize) -> BytesMut { + BytesMut { + inner: Inner2 { + inner: self.inner.drain_to(at), + } + } } /// Sets the length of the buffer @@ -946,13 +1074,13 @@ impl From<Vec<u8>> for BytesMut { mem::forget(src); BytesMut { - inner: Inner { - data: UnsafeCell::new(Data { + inner: Inner2 { + inner: Inner { + arc: AtomicPtr::new(ptr::null_mut()), ptr: ptr, len: len, cap: cap, - }), - arc: Cell::new(0), + } }, } } @@ -966,18 +1094,20 @@ impl From<String> for BytesMut { impl<'a> From<&'a [u8]> for BytesMut { fn from(src: &'a [u8]) -> BytesMut { - if src.len() <= INLINE_CAP { + let len = src.len(); + + if len <= INLINE_CAP { unsafe { - let len = src.len(); - let mut data: [u8; INLINE_CAP] = mem::uninitialized(); - data[0..len].copy_from_slice(src); + let mut inner: Inner = mem::uninitialized(); - let a = KIND_INLINE | (len << INLINE_LEN_OFFSET); + // Set inline mask + inner.arc = AtomicPtr::new(KIND_INLINE as *mut Shared); + inner.set_inline_len(len); + inner.as_raw()[0..len].copy_from_slice(src); BytesMut { - inner: Inner { - data: mem::transmute(data), - arc: Cell::new(a), + inner: Inner2 { + inner: inner, } } } @@ -1041,6 +1171,12 @@ impl fmt::Write for BytesMut { } } +impl Clone for BytesMut { + fn clone(&self) -> BytesMut { + BytesMut::from(&self[..]) + } +} + /* * * ===== Inner ===== @@ -1048,45 +1184,42 @@ impl fmt::Write for BytesMut { */ impl Inner { + /// Return a slice for the handle's view into the shared buffer #[inline] fn as_ref(&self) -> &[u8] { - if self.is_inline() { - unsafe { + unsafe { + if self.is_inline() { slice::from_raw_parts(self.inline_ptr(), self.inline_len()) - } - } else { - unsafe { - let d = &*self.data.get(); - slice::from_raw_parts(d.ptr, d.len) + } else { + slice::from_raw_parts(self.ptr, self.len) } } } + /// Return a mutable slice for the handle's view into the shared buffer #[inline] fn as_mut(&mut self) -> &mut [u8] { - debug_assert!(self.kind() != Kind::Static); + debug_assert!(!self.is_static()); - if self.is_inline() { - unsafe { + unsafe { + if self.is_inline() { slice::from_raw_parts_mut(self.inline_ptr(), self.inline_len()) - } - } else { - unsafe { - let d = &*self.data.get(); - slice::from_raw_parts_mut(d.ptr, d.len) + } else { + slice::from_raw_parts_mut(self.ptr, self.len) } } } + /// Return a mutable slice for the handle's view into the shared buffer + /// including potentially uninitialized bytes. #[inline] unsafe fn as_raw(&mut self) -> &mut [u8] { - debug_assert!(self.kind() != Kind::Static); + debug_assert!(!self.is_static()); if self.is_inline() { - slice::from_raw_parts_mut(self.inline_ptr(), self.inline_capacity()) + slice::from_raw_parts_mut(self.inline_ptr(), INLINE_CAP) } else { - let d = &*self.data.get(); - slice::from_raw_parts_mut(d.ptr, d.cap) + slice::from_raw_parts_mut(self.ptr, self.cap) } } @@ -1095,78 +1228,60 @@ impl Inner { if self.is_inline() { self.inline_len() } else { - unsafe { (*self.data.get()).len } + self.len } } + /// Pointer to the start of the inline buffer #[inline] unsafe fn inline_ptr(&self) -> *mut u8 { - (self.data.get() as *mut u8).offset(self.inline_start() as isize) - } - - #[inline] - fn inline_start(&self) -> usize { - (self.arc.get() & INLINE_START_MASK) >> INLINE_START_OFFSET - } - - #[inline] - fn set_inline_start(&self, start: usize) { - debug_assert!(start <= INLINE_START_MASK); - - let v = (self.arc.get() & !INLINE_START_MASK) | - (start << INLINE_START_OFFSET); - - self.arc.set(v); + (self as *const Inner as *mut Inner as *mut u8) + .offset(INLINE_DATA_OFFSET) } #[inline] fn inline_len(&self) -> usize { - (self.arc.get() & INLINE_LEN_MASK) >> INLINE_LEN_OFFSET - } - - #[inline] - fn set_inline_len(&self, len: usize) { - debug_assert!(len <= INLINE_LEN_MASK); - - let v = (self.arc.get() & !INLINE_LEN_MASK) | - (len << INLINE_LEN_OFFSET); - - self.arc.set(v); + let p: &usize = unsafe { mem::transmute(&self.arc) }; + (p & INLINE_LEN_MASK) >> INLINE_LEN_OFFSET } + /// Set the length of the inline buffer. This is done by writing to the + /// least significant byte of the `arc` field. #[inline] - fn inline_capacity(&self) -> usize { - INLINE_CAP - self.inline_start() + fn set_inline_len(&mut self, len: usize) { + debug_assert!(len <= INLINE_CAP); + let p: &mut usize = unsafe { mem::transmute(&mut self.arc) }; + *p = (*p & !INLINE_LEN_MASK) | (len << INLINE_LEN_OFFSET); } + /// slice. #[inline] unsafe fn set_len(&mut self, len: usize) { if self.is_inline() { - assert!(len <= self.inline_capacity()); + assert!(len <= INLINE_CAP); self.set_inline_len(len); } else { - let d = &mut *self.data.get(); - assert!(len <= d.cap); - d.len = len; + assert!(len <= self.cap); + self.len = len; } } #[inline] - pub fn is_empty(&self) -> bool { + fn is_empty(&self) -> bool { self.len() == 0 } #[inline] - pub fn capacity(&self) -> usize { + fn capacity(&self) -> usize { if self.is_inline() { - self.inline_capacity() + INLINE_CAP } else { - unsafe { (*self.data.get()).cap } + self.cap } } - fn split_off(&self, at: usize) -> Inner { - let other = self.shallow_clone(); + fn split_off(&mut self, at: usize) -> Inner { + let mut other = self.shallow_clone(); unsafe { other.set_start(at); @@ -1176,8 +1291,8 @@ impl Inner { return other } - fn drain_to(&self, at: usize) -> Inner { - let other = self.shallow_clone(); + fn drain_to(&mut self, at: usize) -> Inner { + let mut other = self.shallow_clone(); unsafe { other.set_end(at); @@ -1187,84 +1302,108 @@ impl Inner { return other } - /// Changes the starting index of this window to the index specified. - /// - /// # Panics - /// - /// This method will panic if `start` is out of bounds for the underlying - /// slice. - unsafe fn set_start(&self, start: usize) { + unsafe fn set_start(&mut self, start: usize) { + // This function should never be called when the buffer is still backed + // by a `Vec<u8>` debug_assert!(self.is_shared()); + // Setting the start to 0 is a no-op, so return early if this is the + // case. if start == 0 { return; } + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. if self.is_inline() { - assert!(start <= self.inline_capacity()); + assert!(start <= INLINE_CAP); - let old_start = self.inline_start(); - let old_len = self.inline_len(); + let len = self.inline_len(); - self.set_inline_start(old_start + start); - - if old_len >= start { - self.set_inline_len(old_len - start); - } else { + if len <= start { self.set_inline_len(0); + } else { + // `set_start` is essentially shifting data off the front of the + // view. Inlined buffers only track the length of the slice. + // So, to update the start, the data at the new starting point + // is copied to the beginning of the buffer. + let new_len = len - start; + + let dst = self.inline_ptr(); + let src = (dst as *const u8).offset(start as isize); + + ptr::copy(src, dst, new_len); + + self.set_inline_len(new_len); } } else { - let d = &mut *self.data.get(); + assert!(start <= self.cap); - assert!(start <= d.cap); + // Updating the start of the view is setting `ptr` to point to the + // new start and updating the `len` field to reflect the new length + // of the view. + self.ptr = self.ptr.offset(start as isize); - d.ptr = d.ptr.offset(start as isize); - - // TODO: This could probably be optimized with some bit fiddling - if d.len >= start { - d.len -= start; + if self.len >= start { + self.len -= start; } else { - d.len = 0; + self.len = 0; } - d.cap -= start; + self.cap -= start; } } - /// Changes the end index of this window to the index specified. - /// - /// # Panics - /// - /// This method will panic if `start` is out of bounds for the underlying - /// slice. - unsafe fn set_end(&self, end: usize) { + unsafe fn set_end(&mut self, end: usize) { debug_assert!(self.is_shared()); + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. if self.is_inline() { - assert!(end <= self.inline_capacity()); + assert!(end <= INLINE_CAP); let new_len = cmp::min(self.inline_len(), end); self.set_inline_len(new_len); } else { - let d = &mut *self.data.get(); - - assert!(end <= d.cap); + assert!(end <= self.cap); - d.cap = end; - d.len = cmp::min(d.len, end); + self.cap = end; + self.len = cmp::min(self.len, end); } } /// Checks if it is safe to mutate the memory fn is_mut_safe(&mut self) -> bool { - match self.kind() { - Kind::Static => false, - Kind::Arc => { - unsafe { - let arc: &mut Shared = mem::transmute(&mut self.arc); - Arc::get_mut(arc).is_some() - } + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. + if self.is_inline() { + // Inlined buffers can always be mutated as the data is never shared + // across handles. + true + } else { + // The function requires `&mut self`, which guarantees a unique + // reference to the current handle. This means that the `arc` field + // *cannot* be concurrently mutated. As such, `Relaxed` ordering is + // fine (since we aren't synchronizing with anything). + // + // TODO: No ordering? + let arc = self.arc.load(Relaxed); + + // If the pointer is null, this is a non-shared handle and is mut + // safe. + if arc.is_null() { + return true; + } + + // Check if this is a static buffer + if KIND_STATIC == arc as usize { + return false; + } + + // Otherwise, the underlying buffer is potentially shared with other + // handles, so the ref_count needs to be checked. + unsafe { + return (*arc).is_unique(); } - Kind::Vec | Kind::Inline => true, } } @@ -1272,58 +1411,110 @@ impl Inner { /// it can be done safely. As such, this fn is not public, instead other /// fns will use this one while maintaining the guarantees. fn shallow_clone(&self) -> Inner { - match self.kind() { - Kind::Vec => { - unsafe { - let d = &*self.data.get(); - - // Promote this `Bytes` to an arc, and clone it - let v = Vec::from_raw_parts(d.ptr, d.len, d.cap); - - let a = Arc::new(v); - self.arc.set(mem::transmute(a.clone())); + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. + if self.is_inline() { + // In this case, a shallow_clone still involves copying the data. + unsafe { + // TODO: Just copy the fields + let mut inner: Inner = mem::uninitialized(); + let len = self.inline_len(); - Inner { - data: UnsafeCell::new(*d), - arc: Cell::new(mem::transmute(a)), - } - } + inner.arc = AtomicPtr::new(KIND_INLINE as *mut Shared); + inner.set_inline_len(len); + inner.as_raw()[0..len].copy_from_slice(self.as_ref()); + inner } - Kind::Arc => { + } else { + // The function requires `&self`, this means that `shallow_clone` + // could be called concurrently. + // + // The first step is to load the value of `arc`. This will determine + // how to proceed. The `Acquire` ordering synchronizes with the + // `compare_and_swap` that comes later in this function. The goal is + // to ensure that if `arc` is currently set to point to a `Shared`, + // that the current thread acquires the associated memory. + let mut arc = self.arc.load(Acquire); + + // If `arc` is null, then the buffer is still tracked in a + // `Vec<u8>`. It is time to promote the vec to an `Arc`. This could + // potentially be called concurrently, so some care must be taken. + if arc.is_null() { unsafe { - let arc: &Shared = mem::transmute(&self.arc); - - Inner { - data: UnsafeCell::new(*self.data.get()), - arc: Cell::new(mem::transmute(arc.clone())), + // First, allocate a new `Shared` instance containing the + // `Vec` fields. It's important to note that `ptr`, `len`, + // and `cap` cannot be mutated without having `&mut self`. + // This means that these fields will not be concurrently + // updated and since the buffer hasn't been promoted to an + // `Arc`, those three fields still are the components of the + // vector. + let shared = Box::new(Shared { + vec: Vec::from_raw_parts(self.ptr, self.len, self.cap), + // Initialize refcount to 2. One for this reference, and one + // for the new clone that will be returned from + // `shallow_clone`. + ref_count: AtomicUsize::new(2), + }); + + let shared = Box::into_raw(shared); + + // The pointer should be aligned, so this assert should + // always succeed. + debug_assert!(0 == (shared as usize & 0b11)); + + // Try compare & swapping the pointer into the `arc` field. + // `Release` is used synchronize with other threads that + // will load the `arc` field. + // + // If the `compare_and_swap` fails, then the thread lost the + // race to promote the buffer to shared. The `Acquire` + // ordering will synchronize with the `compare_and_swap` + // that happened in the other thread and the `Shared` + // pointed to by `actual` will be visible. + let actual = self.arc.compare_and_swap(arc, shared, AcqRel); + + if actual.is_null() { + // The upgrade was successful, the new handle can be + // returned. + return Inner { + arc: AtomicPtr::new(shared), + .. *self + }; } - } - } - Kind::Inline => { - let len = self.inline_len(); - - unsafe { - let mut data: Data = mem::uninitialized(); - let dst = &mut data as *mut _ as *mut u8; - let src = self.inline_ptr(); + // The upgrade failed, a concurrent clone happened. Release + // the allocation that was made in this thread, it will not + // be needed. + let shared: Box<Shared> = mem::transmute(shared); + mem::forget(*shared); - ptr::copy_nonoverlapping(src, dst, len); + // Update the `arc` local variable and fall through to a ref + // count update + arc = actual; + } + } else if KIND_STATIC == arc as usize { + // Static buffer + return Inner { + arc: AtomicPtr::new(arc), + .. *self + }; + } - let mut a = KIND_INLINE; - a |= len << INLINE_LEN_OFFSET; + // Buffer already promoted to shared storage, so increment ref + // count. + unsafe { + // Relaxed ordering is acceptable as the memory has already been + // acquired via the `Acquire` load above. + let old_size = (*arc).ref_count.fetch_add(1, Relaxed); - Inner { - data: UnsafeCell::new(data), - arc: Cell::new(a), - } + if old_size == usize::MAX { + panic!(); // TODO: abort } } - Kind::Static => { - Inner { - data: unsafe { UnsafeCell::new(*self.data.get()) }, - arc: Cell::new(self.arc.get()), - } + + Inner { + arc: AtomicPtr::new(arc), + .. *self } } } @@ -1334,194 +1525,283 @@ impl Inner { let rem = self.capacity() - len; if additional <= rem { - // Nothing more to do + // The handle can already store at least `additional` more bytes, so + // there is no further work needed to be done. return; } - match self.kind() { - Kind::Vec => { - unsafe { - let d = &mut *self.data.get(); - - // Promote this `Bytes` to an arc, and clone it - let mut v = Vec::from_raw_parts(d.ptr, d.len, d.cap); - v.reserve(additional); - - // Update the info - d.ptr = v.as_mut_ptr(); - d.len = v.len(); - d.cap = v.capacity(); + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. + if self.is_inline() { + let new_cap = len + additional; - // Drop the vec reference - mem::forget(v); - } - } - Kind::Arc => { - unsafe { - // Compute the new capacity - let new_cap = len + additional; + // Promote to a vector + let mut v = Vec::with_capacity(new_cap); + v.extend_from_slice(self.as_ref()); - // Create a new vector to store the data - let mut v = Vec::with_capacity(new_cap); + self.ptr = v.as_mut_ptr(); + self.len = v.len(); + self.cap = v.capacity(); + self.arc = AtomicPtr::new(ptr::null_mut()); - // Copy the bytes - v.extend_from_slice(self.as_ref()); + mem::forget(v); + return; + } - let d = &mut *self.data.get(); + // `Relaxed` is Ok here (and really, no synchronization is necessary) + // due to having a `&mut self` pointer. The `&mut self` pointer ensures + // that there is no concurrent access on `self`. + let arc = self.arc.load(Relaxed); - d.ptr = v.as_mut_ptr(); - d.len = v.len(); - d.cap = v.capacity(); + if arc.is_null() { + // Currently backed by a vector, so just use `Vector::reserve`. + unsafe { + let mut v = Vec::from_raw_parts(self.ptr, self.len, self.cap); + v.reserve(additional); - mem::forget(v); + // Update the info + self.ptr = v.as_mut_ptr(); + self.len = v.len(); + self.cap = v.capacity(); - // Drop the arc reference - let _: Arc<UnsafeCell<Vec<u8>>> = mem::transmute(self.arc.get()); + // Drop the vec reference + mem::forget(v); - self.arc.set(0); - } + return; } - Kind::Inline => { - let new_cap = len + additional; - - unsafe { - if new_cap <= INLINE_CAP { - let dst = &mut self.data as *mut _ as *mut u8; - let src = self.inline_ptr(); - - ptr::copy(src, dst, len); + } - let mut a = KIND_INLINE; - a |= len << INLINE_LEN_OFFSET; + debug_assert!(!self.is_static()); - self.arc.set(a); - } else { - let mut v = Vec::with_capacity(new_cap); + // Reserving involves abandoning the currently shared buffer and + // allocating a new vector with the requested capacity. + // + // Compute the new capacity + let new_cap = len + additional; - // Copy the bytes - v.extend_from_slice(self.as_ref()); + // Create a new vector to store the data + let mut v = Vec::with_capacity(new_cap); - let d = &mut *self.data.get(); + // Copy the bytes + v.extend_from_slice(self.as_ref()); - d.ptr = v.as_mut_ptr(); - d.len = v.len(); - d.cap = v.capacity(); + // Release the shared handle. This must be done *after* the bytes are + // copied. + release_shared(arc); - mem::forget(v); + // Update self + self.ptr = v.as_mut_ptr(); + self.len = v.len(); + self.cap = v.capacity(); + self.arc = AtomicPtr::new(ptr::null_mut()); - self.arc.set(0); - } - } - } - Kind::Static => unreachable!(), - } + // Forget the vector handle + mem::forget(v); } /// This must take `&mut self` in order to be able to copy memory in the /// inline case. #[inline] fn try_reclaim(&mut self) -> bool { - match self.kind() { - Kind::Inline => { - if self.inline_start() > 0 { - // Shift the data back to the front - unsafe { - let len = self.inline_len(); - let dst = &mut self.data as *mut _ as *mut u8; - let src = self.inline_ptr(); + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. + if self.is_inline() { + // No further work to do. Inlined buffers are always "reclaimed". + return true; + } - ptr::copy(src, dst, len); + // `Relaxed` is Ok here (and really, no synchronization is necessary) + // due to having a `&mut self` pointer. The `&mut self` pointer ensures + // that there is no concurrent access on `self`. + let arc = self.arc.load(Relaxed); - let mut a = KIND_INLINE; - a |= len << INLINE_LEN_OFFSET; + if arc.is_null() { + // Vec storage is already reclaimed + return true; + } - self.arc.set(a); - } - } + debug_assert!(!self.is_static()); - true + unsafe { + if !(*arc).is_unique() { + // Cannot reclaim buffers that are shared. + return false; } - Kind::Arc => { - unsafe { - let arc: &mut Shared = mem::transmute(&mut self.arc); - // Check if mut safe - if Arc::get_mut(arc).is_none() { - return false; - } + // This is the only handle to the buffer. It can be reclaimed. - let v = &mut *arc.get(); - let d = &mut *self.data.get(); + // Get to the shared vector + let v = &mut (*arc).vec; - let len = v.len(); - let ptr = v.as_mut_ptr(); + let len = v.len(); + let ptr = v.as_mut_ptr(); - ptr::copy(d.ptr, ptr, len); + ptr::copy(self.ptr, ptr, len); - d.ptr = ptr; - d.len = len; - d.cap = v.capacity(); + self.ptr = ptr; + self.len = len; + self.cap = v.capacity(); - true - } - } - Kind::Vec => { - true - } - Kind::Static => unreachable!(), + true } } + /// Returns true if the buffer is stored inline #[inline] - fn kind(&self) -> Kind { - let arc = self.arc.get(); - - if arc == 0 { - return Kind::Vec + fn is_inline(&self) -> bool { + // This function is going to probably raise some eyebrows. The function + // returns true if the buffer is stored inline. This is done by checking + // the least significant bit in the `arc` field. + // + // Now, you may notice that `arc` is an `AtomicPtr` and this is + // accessing it as a normal field without performing an atomic load... + // + // Again, the function only cares about the least significant bit, and + // this bit is set when `Inner` is created and never changed after that. + // All platforms have atomic "word" operations and won't randomly flip + // bits, so even without any explicit atomic operations, reading the + // flag will be correct. + // + // This function is very critical performance wise as it is called for + // every operation. Performing an atomic load would mess with the + // compiler's ability to optimize. Simple benchmarks show up to a 10% + // slowdown using a `Relaxed` atomic load on x86. + + #[cfg(target_endian = "little")] + #[inline] + fn imp(arc: &AtomicPtr<Shared>) -> bool { + unsafe { + let p: &u8 = mem::transmute(arc); + *p & (KIND_INLINE as u8) == (KIND_INLINE as u8) + } } - let kind = arc & KIND_MASK; - - match kind { - 0 => Kind::Arc, - KIND_INLINE => Kind::Inline, - KIND_STATIC => Kind::Static, - _ => unreachable!(), + #[cfg(target_endian = "big")] + #[inline] + fn imp(arc: &AtomicPtr<Shared>) -> bool { + unsafe { + let p: &usize = mem::transmute(arc); + *p & KIND_INLINE == KIND_INLINE + } } + + imp(&self.arc) } + /// Used for `debug_assert` statements #[inline] - fn is_inline(&self) -> bool { - self.arc.get() & KIND_MASK == KIND_INLINE + fn is_shared(&self) -> bool { + self.is_inline() || + !self.arc.load(Relaxed).is_null() } + /// Used for `debug_assert` statements #[inline] - fn is_shared(&self) -> bool { - self.kind() != Kind::Vec + fn is_static(&self) -> bool { + !self.is_inline() && + self.arc.load(Relaxed) as usize == KIND_STATIC } } -impl Drop for Inner { +impl Drop for Inner2 { fn drop(&mut self) { - match self.kind() { - Kind::Vec => { - unsafe { - let d = *self.data.get(); - // Not shared, manually free - let _ = Vec::from_raw_parts(d.ptr, d.len, d.cap); - } - } - Kind::Arc => { - unsafe { - let _: Arc<UnsafeCell<Vec<u8>>> = mem::transmute(self.arc.get()); - } + // Always check `inline` first, because if the handle is using inline + // data storage, all of the `Inner` struct fields will be gibberish. + if self.is_inline() { + return; + } + + // Acquire is needed here to ensure that the `Shared` memory is + // visible. + let arc = self.arc.load(Acquire); + + if arc as usize == KIND_STATIC { + // Static buffer, no work to do + return; + } + + if arc.is_null() { + // Vector storage, free the vector + unsafe { + let _ = Vec::from_raw_parts(self.ptr, self.len, self.cap); } - _ => {} + + return; } + + release_shared(arc); + } +} + +fn release_shared(ptr: *mut Shared) { + // `Shared` storage... follow the drop steps from Arc. + unsafe { + if (*ptr).ref_count.fetch_sub(1, Release) != 1 { + return; + } + + // This fence is needed to prevent reordering of use of the data and + // deletion of the data. Because it is marked `Release`, the decreasing + // of the reference count synchronizes with this `Acquire` fence. This + // means that use of the data happens before decreasing the reference + // count, which happens before this fence, which happens before the + // deletion of the data. + // + // As explained in the [Boost documentation][1], + // + // > It is important to enforce any possible access to the object in one + // > thread (through an existing reference) to *happen before* deleting + // > the object in a different thread. This is achieved by a "release" + // > operation after dropping a reference (any access to the object + // > through this reference must obviously happened before), and an + // > "acquire" operation before deleting the object. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + atomic::fence(Acquire); + + // Drop the data + let _: Box<Shared> = mem::transmute(ptr); + } +} + +impl Shared { + fn is_unique(&self) -> bool { + // The goal is to check if the current handle is the only handle + // that currently has access to the buffer. This is done by + // checking if the `ref_count` is currently 1. + // + // The `Acquire` ordering synchronizes with the `Release` as + // part of the `fetch_sub` in `release_shared`. The `fetch_sub` + // operation guarantees that any mutations done in other threads + // are ordered before the `ref_count` is decremented. As such, + // this `Acquire` will guarantee that those mutations are + // visible to the current thread. + self.ref_count.load(Acquire) == 1 } } unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +/* + * + * ===== impl Inner2 ===== + * + */ + +impl ops::Deref for Inner2 { + type Target = Inner; + + fn deref(&self) -> &Inner { + &self.inner + } +} + +impl ops::DerefMut for Inner2 { + fn deref_mut(&mut self) -> &mut Inner { + &mut self.inner + } +} /* * @@ -1658,9 +1938,3 @@ impl<'a, T: ?Sized> PartialEq<&'a T> for Bytes *self == **other } } - -impl Clone for BytesMut { - fn clone(&self) -> BytesMut { - BytesMut::from(&self[..]) - } -} diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 1e38e5c..50ef9e9 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -5,11 +5,10 @@ use bytes::{Bytes, BytesMut, BufMut}; const LONG: &'static [u8] = b"mary had a little lamb, little lamb, little lamb"; const SHORT: &'static [u8] = b"hello world"; -#[cfg(target_pointer_width = "64")] -const INLINE_CAP: usize = 8 * 3; - -#[cfg(target_pointer_width = "32")] -const INNER_CAP: usize = 4 * 3; +fn inline_cap() -> usize { + use std::mem; + 4 * mem::size_of::<usize>() - 1 +} fn is_sync<T: Sync>() {} fn is_send<T: Send>() {} @@ -17,6 +16,7 @@ fn is_send<T: Send>() {} #[test] fn test_bounds() { is_sync::<Bytes>(); + is_sync::<BytesMut>(); is_send::<Bytes>(); is_send::<BytesMut>(); } @@ -90,25 +90,25 @@ fn slice() { #[should_panic] fn slice_oob_1() { let a = Bytes::from(&b"hello world"[..]); - a.slice(5, 25); + a.slice(5, inline_cap() + 1); } #[test] #[should_panic] fn slice_oob_2() { let a = Bytes::from(&b"hello world"[..]); - a.slice(25, 30); + a.slice(inline_cap() + 1, inline_cap() + 5); } #[test] fn split_off() { - let hello = Bytes::from(&b"helloworld"[..]); + let mut hello = Bytes::from(&b"helloworld"[..]); let world = hello.split_off(5); assert_eq!(hello, &b"hello"[..]); assert_eq!(world, &b"world"[..]); - let hello = BytesMut::from(&b"helloworld"[..]); + let mut hello = BytesMut::from(&b"helloworld"[..]); let world = hello.split_off(5); assert_eq!(hello, &b"hello"[..]); @@ -118,21 +118,14 @@ fn split_off() { #[test] #[should_panic] fn split_off_oob() { - let hello = Bytes::from(&b"helloworld"[..]); - hello.split_off(25); -} - -#[test] -#[should_panic] -fn split_off_oob_mut() { - let hello = BytesMut::from(&b"helloworld"[..]); - hello.split_off(25); + let mut hello = Bytes::from(&b"helloworld"[..]); + hello.split_off(inline_cap() + 1); } #[test] fn split_off_uninitialized() { let mut bytes = BytesMut::with_capacity(1024); - let other = bytes.split_off_mut(128); + let other = bytes.split_off(128); assert_eq!(bytes.len(), 0); assert_eq!(bytes.capacity(), 128); @@ -144,44 +137,55 @@ fn split_off_uninitialized() { #[test] fn drain_to_1() { // Inline - let a = Bytes::from(SHORT); + let mut a = Bytes::from(SHORT); let b = a.drain_to(4); assert_eq!(SHORT[4..], a); assert_eq!(SHORT[..4], b); // Allocated - let a = Bytes::from(LONG); + let mut a = Bytes::from(LONG); let b = a.drain_to(4); assert_eq!(LONG[4..], a); assert_eq!(LONG[..4], b); - let a = Bytes::from(LONG); + let mut a = Bytes::from(LONG); let b = a.drain_to(30); assert_eq!(LONG[30..], a); assert_eq!(LONG[..30], b); } +#[test] +fn drain_to_2() { + let mut a = Bytes::from(LONG); + assert_eq!(LONG, a); + + let b = a.drain_to(1); + + assert_eq!(LONG[1..], a); + drop(b); +} + #[test] #[should_panic] fn drain_to_oob() { - let hello = Bytes::from(&b"helloworld"[..]); - hello.drain_to(30); + let mut hello = Bytes::from(&b"helloworld"[..]); + hello.drain_to(inline_cap() + 1); } #[test] #[should_panic] fn drain_to_oob_mut() { - let hello = BytesMut::from(&b"helloworld"[..]); - hello.drain_to(30); + let mut hello = BytesMut::from(&b"helloworld"[..]); + hello.drain_to(inline_cap() + 1); } #[test] fn drain_to_uninitialized() { let mut bytes = BytesMut::with_capacity(1024); - let other = bytes.drain_to_mut(128); + let other = bytes.drain_to(128); assert_eq!(bytes.len(), 0); assert_eq!(bytes.capacity(), 896); @@ -212,12 +216,12 @@ fn reserve() { assert_eq!(bytes, "hello"); // Inline -> Inline - let mut bytes = BytesMut::with_capacity(INLINE_CAP); + let mut bytes = BytesMut::with_capacity(inline_cap()); bytes.put("abcdefghijkl"); let a = bytes.drain_to(10); - bytes.reserve(INLINE_CAP - 3); - assert_eq!(INLINE_CAP, bytes.capacity()); + bytes.reserve(inline_cap() - 3); + assert_eq!(inline_cap(), bytes.capacity()); assert_eq!(bytes, "kl"); assert_eq!(a, "abcdefghij"); @@ -238,19 +242,19 @@ fn reserve() { } #[test] -fn try_reclaim() { +fn try_reclaim_1() { // Inline w/ start at zero let mut bytes = BytesMut::from(&SHORT[..]); assert!(bytes.try_reclaim()); - assert_eq!(bytes.capacity(), INLINE_CAP); + assert_eq!(bytes.capacity(), inline_cap()); assert_eq!(bytes, SHORT); // Inline w/ start not at zero let mut bytes = BytesMut::from(&SHORT[..]); let _ = bytes.drain_to(2); - assert_eq!(bytes.capacity(), INLINE_CAP - 2); + assert_eq!(bytes.capacity(), inline_cap()); assert!(bytes.try_reclaim()); - assert_eq!(bytes.capacity(), INLINE_CAP); + assert_eq!(bytes.capacity(), inline_cap()); assert_eq!(bytes, &SHORT[2..]); // Arc @@ -263,3 +267,65 @@ fn try_reclaim() { assert!(bytes.try_reclaim()); assert_eq!(bytes.capacity(), LONG.len()); } + +#[test] +fn try_reclaim_2() { + let mut bytes = BytesMut::from( + "Lorem ipsum dolor sit amet, consectetur adipiscing elit."); + + // Create a new handle to the shared memory region + let a = bytes.drain_to(5); + + // Attempting to reclaim here will fail due to `a` still being in + // existence. + assert!(!bytes.try_reclaim()); + assert_eq!(bytes.capacity(), 51); + + // Dropping the handle will allow reclaim to succeed. + drop(a); + assert!(bytes.try_reclaim()); + assert_eq!(bytes.capacity(), 56); +} + +#[test] +fn inline_storage() { + let mut bytes = BytesMut::with_capacity(inline_cap()); + let zero = [0u8; 64]; + + bytes.put(&zero[0..inline_cap()]); + assert_eq!(*bytes, zero[0..inline_cap()]); +} + +#[test] +fn stress() { + // Tests promoting a buffer from a vec -> shared in a concurrent situation + use std::sync::{Arc, Barrier}; + use std::thread; + + const THREADS: usize = 8; + const ITERS: usize = 1_000; + + for i in 0..ITERS { + let data = [i as u8; 256]; + let buf = Arc::new(BytesMut::from(&data[..])); + + let barrier = Arc::new(Barrier::new(THREADS)); + let mut joins = Vec::with_capacity(THREADS); + + for _ in 0..THREADS { + let c = barrier.clone(); + let buf = buf.clone(); + + joins.push(thread::spawn(move || { + c.wait(); + let _buf = buf.clone(); + })); + } + + for th in joins { + th.join().unwrap(); + } + + assert_eq!(*buf, data[..]); + } +} -- GitLab