diff --git a/src/bytes.rs b/src/bytes.rs index ac6559268ba7d0c9bbb968d1dc32863938635da3..34451b1f8e774cab4087a174035ff90ec018d576 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -321,7 +321,7 @@ struct Inner { // other shenanigans to make it work. struct Shared { vec: Vec<u8>, - original_capacity: usize, + original_capacity_repr: usize, ref_count: AtomicUsize, } @@ -332,7 +332,24 @@ const KIND_STATIC: usize = 0b10; const KIND_VEC: usize = 0b11; const KIND_MASK: usize = 0b11; -const MAX_ORIGINAL_CAPACITY: usize = 1 << 16; +// The max original capacity value. Any `Bytes` allocated with a greater initial +// capacity will default to this. +const MAX_ORIGINAL_CAPACITY_WIDTH: usize = 17; +// The original capacity algorithm will not take effect unless the originally +// allocated capacity was at least 1kb in size. +const MIN_ORIGINAL_CAPACITY_WIDTH: usize = 10; +// The original capacity is stored in powers of 2 starting at 1kb to a max of +// 64kb. Representing it as such requires only 3 bits of storage. +const ORIGINAL_CAPACITY_MASK: usize = 0b11100; +const ORIGINAL_CAPACITY_OFFSET: usize = 2; + +// When the storage is in the `Vec` representation, the pointer can be advanced +// at most this value. This is due to the amount of storage available to track +// the offset is usize - number of KIND bits and number of ORIGINAL_CAPACITY +// bits. +const VEC_POS_OFFSET: usize = 5; +const MAX_VEC_POS: usize = usize::MAX >> VEC_POS_OFFSET; +const NOT_VEC_POS_MASK: usize = 0b11111; // Bit op constants for extracting the inline length value from the `arc` field. const INLINE_LEN_MASK: usize = 0b11111100; @@ -347,6 +364,11 @@ const INLINE_DATA_OFFSET: isize = 1; #[cfg(target_endian = "big")] const INLINE_DATA_OFFSET: isize = 0; +#[cfg(target_pointer_width = "64")] +const PTR_WIDTH: usize = 64; +#[cfg(target_pointer_width = "32")] +const PTR_WIDTH: usize = 32; + // Inline buffer capacity. This is the size of `Inner` minus 1 byte for the // metadata. #[cfg(target_pointer_width = "64")] @@ -655,6 +677,22 @@ impl Bytes { self.inner.truncate(len); } + /// Shortens the buffer, dropping the first `cnt` bytes and keeping the + /// rest. + /// + /// This is the same function as `Buf::advance`, and in the next breaking + /// release of `bytes`, this implementation will be removed in favor of + /// having `Bytes` implement `Buf`. + /// + /// # Panics + /// + /// This function panics if `cnt` is greater than `self.len()` + #[inline] + pub fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.len(), "cannot advance past `remaining`"); + unsafe { self.inner.set_start(cnt); } + } + /// Clears the buffer, removing all data. /// /// # Examples @@ -1202,6 +1240,22 @@ impl BytesMut { self.inner.truncate(len); } + /// Shortens the buffer, dropping the first `cnt` bytes and keeping the + /// rest. + /// + /// This is the same function as `Buf::advance`, and in the next breaking + /// release of `bytes`, this implementation will be removed in favor of + /// having `BytesMut` implement `Buf`. + /// + /// # Panics + /// + /// This function panics if `cnt` is greater than `self.len()` + #[inline] + pub fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.len(), "cannot advance past `remaining`"); + unsafe { self.inner.set_start(cnt); } + } + /// Clears the buffer, removing all data. /// /// # Examples @@ -1608,8 +1662,8 @@ impl Inner { mem::forget(src); - let original_capacity = cmp::min(cap, MAX_ORIGINAL_CAPACITY); - let arc = (original_capacity & !KIND_MASK) | KIND_VEC; + let original_capacity_repr = original_capacity_to_repr(cap); + let arc = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC; Inner { arc: AtomicPtr::new(arc as *mut Shared), @@ -1777,19 +1831,17 @@ impl Inner { } unsafe fn set_start(&mut self, start: usize) { - // This function should never be called when the buffer is still backed - // by a `Vec<u8>` - debug_assert!(self.is_shared()); - // Setting the start to 0 is a no-op, so return early if this is the // case. if start == 0 { return; } + let kind = self.kind(); + // Always check `inline` first, because if the handle is using inline // data storage, all of the `Inner` struct fields will be gibberish. - if self.is_inline() { + if kind == KIND_INLINE { assert!(start <= INLINE_CAP); let len = self.inline_len(); @@ -1813,6 +1865,25 @@ impl Inner { } else { assert!(start <= self.cap); + if kind == KIND_VEC { + // Setting the start when in vec representation is a little more + // complicated. First, we have to track how far ahead the + // "start" of the byte buffer from the beginning of the vec. We + // also have to ensure that we don't exceed the maximum shift. + let (mut pos, prev) = self.uncoordinated_get_vec_pos(); + pos += start; + + if pos <= MAX_VEC_POS { + self.uncoordinated_set_vec_pos(pos, prev); + } else { + // The repr must be upgraded to ARC. This will never happen + // on 64 bit systems and will only happen on 32 bit systems + // when shifting past 134,217,727 bytes. As such, we don't + // worry too much about performance here. + let _ = self.shallow_clone(); + } + } + // Updating the start of the view is setting `ptr` to point to the // new start and updating the `len` field to reflect the new length // of the view. @@ -1869,6 +1940,11 @@ impl Inner { /// Increments the ref count. This should only be done if it is known that /// it can be done safely. As such, this fn is not public, instead other /// fns will use this one while maintaining the guarantees. + /// + /// "Safely" is defined as not exposing two `BytesMut` values that point to + /// the same byte window. + /// + /// This function is thread safe. fn shallow_clone(&self) -> Inner { // Always check `inline` first, because if the handle is using inline // data storage, all of the `Inner` struct fields will be gibberish. @@ -1899,7 +1975,14 @@ impl Inner { // promote the vec to an `Arc`. This could potentially be called // concurrently, so some care must be taken. if arc as usize & KIND_MASK == KIND_VEC { + let original_capacity_repr = + (arc as usize & ORIGINAL_CAPACITY_MASK) >> ORIGINAL_CAPACITY_OFFSET; + unsafe { + // The vec offset cannot be concurrently mutated, so there + // should be no danger reading it. + let off = (arc as usize) >> VEC_POS_OFFSET; + // First, allocate a new `Shared` instance containing the // `Vec` fields. It's important to note that `ptr`, `len`, // and `cap` cannot be mutated without having `&mut self`. @@ -1908,8 +1991,8 @@ impl Inner { // `Arc`, those three fields still are the components of the // vector. let shared = Box::new(Shared { - vec: Vec::from_raw_parts(self.ptr, self.len, self.cap), - original_capacity: arc as usize & !KIND_MASK, + vec: rebuild_vec(self.ptr, self.len, self.cap, off), + original_capacity_repr: original_capacity_repr, // Initialize refcount to 2. One for this reference, and one // for the new clone that will be returned from // `shallow_clone`. @@ -2016,13 +2099,14 @@ impl Inner { if kind == KIND_VEC { // Currently backed by a vector, so just use `Vector::reserve`. unsafe { - let mut v = Vec::from_raw_parts(self.ptr, self.len, self.cap); + let (off, _) = self.uncoordinated_get_vec_pos(); + let mut v = rebuild_vec(self.ptr, self.len, self.cap, off); v.reserve(additional); // Update the info - self.ptr = v.as_mut_ptr(); - self.len = v.len(); - self.cap = v.capacity(); + self.ptr = v.as_mut_ptr().offset(off as isize); + self.len = v.len() - off; + self.cap = v.capacity() - off; // Drop the vec reference mem::forget(v); @@ -2041,9 +2125,11 @@ impl Inner { // Compute the new capacity let mut new_cap = len + additional; let original_capacity; + let original_capacity_repr; unsafe { - original_capacity = (*arc).original_capacity; + original_capacity_repr = (*arc).original_capacity_repr; + original_capacity = original_capacity_from_repr(original_capacity_repr); // First, try to reclaim the buffer. This is possible if the current // handle is the only outstanding handle pointing to the buffer. @@ -2096,7 +2182,7 @@ impl Inner { self.len = v.len(); self.cap = v.capacity(); - let arc = (original_capacity & !KIND_MASK) | KIND_VEC; + let arc = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC; self.arc = AtomicPtr::new(arc as *mut Shared); @@ -2169,6 +2255,43 @@ impl Inner { imp(&self.arc) } + + #[inline] + fn uncoordinated_get_vec_pos(&mut self) -> (usize, usize) { + // Similar to above, this is a pretty crazed function. This should only + // be called when in the KIND_VEC mode. This + the &mut self argument + // guarantees that there is no possibility of concurrent calls to this + // function. + let prev = unsafe { + let p: &AtomicPtr<Shared> = &self.arc; + let p: &usize = mem::transmute(p); + *p + }; + + (prev >> VEC_POS_OFFSET, prev) + } + + #[inline] + fn uncoordinated_set_vec_pos(&mut self, pos: usize, prev: usize) { + // Once more... crazy + debug_assert!(pos <= MAX_VEC_POS); + + unsafe { + let p: &mut AtomicPtr<Shared> = &mut self.arc; + let p: &mut usize = mem::transmute(p); + *p = (pos << VEC_POS_OFFSET) | (prev & NOT_VEC_POS_MASK); + } + } +} + +fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize) -> Vec<u8> { + unsafe { + let ptr = ptr.offset(-(off as isize)); + len += off; + cap += off; + + Vec::from_raw_parts(ptr, len, cap) + } } impl Drop for Inner { @@ -2176,10 +2299,10 @@ impl Drop for Inner { let kind = self.kind(); if kind == KIND_VEC { + let (off, _) = self.uncoordinated_get_vec_pos(); + // Vector storage, free the vector - unsafe { - let _ = Vec::from_raw_parts(self.ptr, self.len, self.cap); - } + let _ = rebuild_vec(self.ptr, self.len, self.cap, off); } else if kind == KIND_ARC { release_shared(*self.arc.get_mut()); } @@ -2233,6 +2356,52 @@ impl Shared { } } +fn original_capacity_to_repr(cap: usize) -> usize { + let width = PTR_WIDTH - ((cap >> MIN_ORIGINAL_CAPACITY_WIDTH).leading_zeros() as usize); + cmp::min(width, MAX_ORIGINAL_CAPACITY_WIDTH - MIN_ORIGINAL_CAPACITY_WIDTH) +} + +fn original_capacity_from_repr(repr: usize) -> usize { + if repr == 0 { + return 0; + } + + 1 << (repr + (MIN_ORIGINAL_CAPACITY_WIDTH - 1)) +} + +#[test] +fn test_original_capacity_to_repr() { + for &cap in &[0, 1, 16, 1000] { + assert_eq!(0, original_capacity_to_repr(cap)); + } + + for &cap in &[1024, 1025, 1100, 2000, 2047] { + assert_eq!(1, original_capacity_to_repr(cap)); + } + + for &cap in &[2048, 2049] { + assert_eq!(2, original_capacity_to_repr(cap)); + } + + // TODO: more + + for &cap in &[65536, 65537, 68000, 1 << 17, 1 << 18, 1 << 20, 1 << 30] { + assert_eq!(7, original_capacity_to_repr(cap), "cap={}", cap); + } +} + +#[test] +fn test_original_capacity_from_repr() { + assert_eq!(0, original_capacity_from_repr(0)); + assert_eq!(1024, original_capacity_from_repr(1)); + assert_eq!(1024 * 2, original_capacity_from_repr(2)); + assert_eq!(1024 * 4, original_capacity_from_repr(3)); + assert_eq!(1024 * 8, original_capacity_from_repr(4)); + assert_eq!(1024 * 16, original_capacity_from_repr(5)); + assert_eq!(1024 * 32, original_capacity_from_repr(6)); + assert_eq!(1024 * 64, original_capacity_from_repr(7)); +} + unsafe impl Send for Inner {} unsafe impl Sync for Inner {} diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs index 1c8ccd0b1472d6e4cbcb45a5c2198be99d39dde9..35ba8ac225a935a24a59318191bd09e616c8b08a 100644 --- a/tests/test_bytes.rs +++ b/tests/test_bytes.rs @@ -350,16 +350,16 @@ fn reserve_growth() { #[test] fn reserve_allocates_at_least_original_capacity() { - let mut bytes = BytesMut::with_capacity(128); + let mut bytes = BytesMut::with_capacity(1024); - for i in 0..120 { + for i in 0..1020 { bytes.put(i as u8); } let _other = bytes.take(); bytes.reserve(16); - assert_eq!(bytes.capacity(), 128); + assert_eq!(bytes.capacity(), 1024); } #[test] @@ -466,6 +466,44 @@ fn from_static() { assert_eq!(b, b"b"[..]); } +#[test] +fn advance_inline() { + let mut a = Bytes::from(&b"hello world"[..]); + a.advance(6); + assert_eq!(a, &b"world"[..]); +} + +#[test] +fn advance_static() { + let mut a = Bytes::from_static(b"hello world"); + a.advance(6); + assert_eq!(a, &b"world"[..]); +} + +#[test] +fn advance_vec() { + let mut a = BytesMut::from(b"hello world boooo yah world zomg wat wat".to_vec()); + a.advance(16); + assert_eq!(a, b"o yah world zomg wat wat"[..]); + + a.advance(4); + assert_eq!(a, b"h world zomg wat wat"[..]); + + // Reserve some space. + a.reserve(1024); + assert_eq!(a, b"h world zomg wat wat"[..]); + + a.advance(6); + assert_eq!(a, b"d zomg wat wat"[..]); +} + +#[test] +#[should_panic] +fn advance_past_len() { + let mut a = BytesMut::from(b"hello world".to_vec()); + a.advance(20); +} + #[test] // Only run these tests on little endian systems. CI uses qemu for testing // little endian... and qemu doesn't really support threading all that well.