From 02891144be0a889ad1c34b84c5ec15f4fb6d2c57 Mon Sep 17 00:00:00 2001
From: Carl Lerche <me@carllerche.com>
Date: Wed, 13 Dec 2017 13:30:03 -0600
Subject: [PATCH] Add `advance` on `Bytes` and `BytesMut` (#166)

* Compact Bytes original capacity representation

In order to avoid unnecessary allocations, a `Bytes` structure remembers
the capacity with which it was first created. When a reserve operation
is issued, this original capacity value is used to as a baseline for
reallocating new storage.

Previously, this original capacity value was stored in its raw form. In
other words, the original capacity `usize` was stored as is. In order to
reclaim some `Bytes` internal storage space for additional features,
this original capacity value is compressed from requiring 16 bits to 3.

To do this, instead of storing the exact original capacity. The original
capacity is rounded down to the nearest power of two. If the original
capacity is less than 1024, then it is rounded down to zero. This
roughly means that the original capacity is now stored as a table:

0 => 0
1 => 1k
2 => 2k
3 => 4k
4 => 8k
5 => 16k
6 => 32k
7 => 64k

For the purposes that the original capacity feature was introduced, this
is sufficient granularity.

* Provide `advance` on Bytes and BytesMut

This is the `advance` function that would be part of a `Buf`
implementation. However, `Bytes` and `BytesMut` cannot impl `Buf` until
the next breaking release.

The implementation uses the additional storage made available by the
previous commit to store the number of bytes that the view was advanced.
The `ptr` pointer will point to the start of the window, avoiding any
pointer arithmetic when dereferencing the `Bytes` handle.
---
 src/bytes.rs        | 209 +++++++++++++++++++++++++++++++++++++++-----
 tests/test_bytes.rs |  44 +++++++++-
 2 files changed, 230 insertions(+), 23 deletions(-)

diff --git a/src/bytes.rs b/src/bytes.rs
index ac65592..34451b1 100644
--- a/src/bytes.rs
+++ b/src/bytes.rs
@@ -321,7 +321,7 @@ struct Inner {
 // other shenanigans to make it work.
 struct Shared {
     vec: Vec<u8>,
-    original_capacity: usize,
+    original_capacity_repr: usize,
     ref_count: AtomicUsize,
 }
 
@@ -332,7 +332,24 @@ const KIND_STATIC: usize = 0b10;
 const KIND_VEC: usize = 0b11;
 const KIND_MASK: usize = 0b11;
 
-const MAX_ORIGINAL_CAPACITY: usize = 1 << 16;
+// The max original capacity value. Any `Bytes` allocated with a greater initial
+// capacity will default to this.
+const MAX_ORIGINAL_CAPACITY_WIDTH: usize = 17;
+// The original capacity algorithm will not take effect unless the originally
+// allocated capacity was at least 1kb in size.
+const MIN_ORIGINAL_CAPACITY_WIDTH: usize = 10;
+// The original capacity is stored in powers of 2 starting at 1kb to a max of
+// 64kb. Representing it as such requires only 3 bits of storage.
+const ORIGINAL_CAPACITY_MASK: usize = 0b11100;
+const ORIGINAL_CAPACITY_OFFSET: usize = 2;
+
+// When the storage is in the `Vec` representation, the pointer can be advanced
+// at most this value. This is due to the amount of storage available to track
+// the offset is usize - number of KIND bits and number of ORIGINAL_CAPACITY
+// bits.
+const VEC_POS_OFFSET: usize = 5;
+const MAX_VEC_POS: usize = usize::MAX >> VEC_POS_OFFSET;
+const NOT_VEC_POS_MASK: usize = 0b11111;
 
 // Bit op constants for extracting the inline length value from the `arc` field.
 const INLINE_LEN_MASK: usize = 0b11111100;
@@ -347,6 +364,11 @@ const INLINE_DATA_OFFSET: isize = 1;
 #[cfg(target_endian = "big")]
 const INLINE_DATA_OFFSET: isize = 0;
 
+#[cfg(target_pointer_width = "64")]
+const PTR_WIDTH: usize = 64;
+#[cfg(target_pointer_width = "32")]
+const PTR_WIDTH: usize = 32;
+
 // Inline buffer capacity. This is the size of `Inner` minus 1 byte for the
 // metadata.
 #[cfg(target_pointer_width = "64")]
@@ -655,6 +677,22 @@ impl Bytes {
         self.inner.truncate(len);
     }
 
+    /// Shortens the buffer, dropping the first `cnt` bytes and keeping the
+    /// rest.
+    ///
+    /// This is the same function as `Buf::advance`, and in the next breaking
+    /// release of `bytes`, this implementation will be removed in favor of
+    /// having `Bytes` implement `Buf`.
+    ///
+    /// # Panics
+    ///
+    /// This function panics if `cnt` is greater than `self.len()`
+    #[inline]
+    pub fn advance(&mut self, cnt: usize) {
+        assert!(cnt <= self.len(), "cannot advance past `remaining`");
+        unsafe { self.inner.set_start(cnt); }
+    }
+
     /// Clears the buffer, removing all data.
     ///
     /// # Examples
@@ -1202,6 +1240,22 @@ impl BytesMut {
         self.inner.truncate(len);
     }
 
+    /// Shortens the buffer, dropping the first `cnt` bytes and keeping the
+    /// rest.
+    ///
+    /// This is the same function as `Buf::advance`, and in the next breaking
+    /// release of `bytes`, this implementation will be removed in favor of
+    /// having `BytesMut` implement `Buf`.
+    ///
+    /// # Panics
+    ///
+    /// This function panics if `cnt` is greater than `self.len()`
+    #[inline]
+    pub fn advance(&mut self, cnt: usize) {
+        assert!(cnt <= self.len(), "cannot advance past `remaining`");
+        unsafe { self.inner.set_start(cnt); }
+    }
+
     /// Clears the buffer, removing all data.
     ///
     /// # Examples
@@ -1608,8 +1662,8 @@ impl Inner {
 
         mem::forget(src);
 
-        let original_capacity = cmp::min(cap, MAX_ORIGINAL_CAPACITY);
-        let arc = (original_capacity & !KIND_MASK) | KIND_VEC;
+        let original_capacity_repr = original_capacity_to_repr(cap);
+        let arc = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC;
 
         Inner {
             arc: AtomicPtr::new(arc as *mut Shared),
@@ -1777,19 +1831,17 @@ impl Inner {
     }
 
     unsafe fn set_start(&mut self, start: usize) {
-        // This function should never be called when the buffer is still backed
-        // by a `Vec<u8>`
-        debug_assert!(self.is_shared());
-
         // Setting the start to 0 is a no-op, so return early if this is the
         // case.
         if start == 0 {
             return;
         }
 
+        let kind = self.kind();
+
         // Always check `inline` first, because if the handle is using inline
         // data storage, all of the `Inner` struct fields will be gibberish.
-        if self.is_inline() {
+        if kind == KIND_INLINE {
             assert!(start <= INLINE_CAP);
 
             let len = self.inline_len();
@@ -1813,6 +1865,25 @@ impl Inner {
         } else {
             assert!(start <= self.cap);
 
+            if kind == KIND_VEC {
+                // Setting the start when in vec representation is a little more
+                // complicated. First, we have to track how far ahead the
+                // "start" of the byte buffer from the beginning of the vec. We
+                // also have to ensure that we don't exceed the maximum shift.
+                let (mut pos, prev) = self.uncoordinated_get_vec_pos();
+                pos += start;
+
+                if pos <= MAX_VEC_POS {
+                    self.uncoordinated_set_vec_pos(pos, prev);
+                } else {
+                    // The repr must be upgraded to ARC. This will never happen
+                    // on 64 bit systems and will only happen on 32 bit systems
+                    // when shifting past 134,217,727 bytes. As such, we don't
+                    // worry too much about performance here.
+                    let _ = self.shallow_clone();
+                }
+            }
+
             // Updating the start of the view is setting `ptr` to point to the
             // new start and updating the `len` field to reflect the new length
             // of the view.
@@ -1869,6 +1940,11 @@ impl Inner {
     /// Increments the ref count. This should only be done if it is known that
     /// it can be done safely. As such, this fn is not public, instead other
     /// fns will use this one while maintaining the guarantees.
+    ///
+    /// "Safely" is defined as not exposing two `BytesMut` values that point to
+    /// the same byte window.
+    ///
+    /// This function is thread safe.
     fn shallow_clone(&self) -> Inner {
         // Always check `inline` first, because if the handle is using inline
         // data storage, all of the `Inner` struct fields will be gibberish.
@@ -1899,7 +1975,14 @@ impl Inner {
             // promote the vec to an `Arc`. This could potentially be called
             // concurrently, so some care must be taken.
             if arc as usize & KIND_MASK == KIND_VEC {
+                let original_capacity_repr =
+                    (arc as usize & ORIGINAL_CAPACITY_MASK) >> ORIGINAL_CAPACITY_OFFSET;
+
                 unsafe {
+                    // The vec offset cannot be concurrently mutated, so there
+                    // should be no danger reading it.
+                    let off = (arc as usize) >> VEC_POS_OFFSET;
+
                     // First, allocate a new `Shared` instance containing the
                     // `Vec` fields. It's important to note that `ptr`, `len`,
                     // and `cap` cannot be mutated without having `&mut self`.
@@ -1908,8 +1991,8 @@ impl Inner {
                     // `Arc`, those three fields still are the components of the
                     // vector.
                     let shared = Box::new(Shared {
-                        vec: Vec::from_raw_parts(self.ptr, self.len, self.cap),
-                        original_capacity: arc as usize & !KIND_MASK,
+                        vec: rebuild_vec(self.ptr, self.len, self.cap, off),
+                        original_capacity_repr: original_capacity_repr,
                         // Initialize refcount to 2. One for this reference, and one
                         // for the new clone that will be returned from
                         // `shallow_clone`.
@@ -2016,13 +2099,14 @@ impl Inner {
         if kind == KIND_VEC {
             // Currently backed by a vector, so just use `Vector::reserve`.
             unsafe {
-                let mut v = Vec::from_raw_parts(self.ptr, self.len, self.cap);
+                let (off, _) = self.uncoordinated_get_vec_pos();
+                let mut v = rebuild_vec(self.ptr, self.len, self.cap, off);
                 v.reserve(additional);
 
                 // Update the info
-                self.ptr = v.as_mut_ptr();
-                self.len = v.len();
-                self.cap = v.capacity();
+                self.ptr = v.as_mut_ptr().offset(off as isize);
+                self.len = v.len() - off;
+                self.cap = v.capacity() - off;
 
                 // Drop the vec reference
                 mem::forget(v);
@@ -2041,9 +2125,11 @@ impl Inner {
         // Compute the new capacity
         let mut new_cap = len + additional;
         let original_capacity;
+        let original_capacity_repr;
 
         unsafe {
-            original_capacity = (*arc).original_capacity;
+            original_capacity_repr = (*arc).original_capacity_repr;
+            original_capacity = original_capacity_from_repr(original_capacity_repr);
 
             // First, try to reclaim the buffer. This is possible if the current
             // handle is the only outstanding handle pointing to the buffer.
@@ -2096,7 +2182,7 @@ impl Inner {
         self.len = v.len();
         self.cap = v.capacity();
 
-        let arc = (original_capacity & !KIND_MASK) | KIND_VEC;
+        let arc = (original_capacity_repr << ORIGINAL_CAPACITY_OFFSET) | KIND_VEC;
 
         self.arc = AtomicPtr::new(arc as *mut Shared);
 
@@ -2169,6 +2255,43 @@ impl Inner {
 
         imp(&self.arc)
     }
+
+    #[inline]
+    fn uncoordinated_get_vec_pos(&mut self) -> (usize, usize) {
+        // Similar to above, this is a pretty crazed function. This should only
+        // be called when in the KIND_VEC mode. This + the &mut self argument
+        // guarantees that there is no possibility of concurrent calls to this
+        // function.
+        let prev = unsafe {
+            let p: &AtomicPtr<Shared> = &self.arc;
+            let p: &usize = mem::transmute(p);
+            *p
+        };
+
+        (prev >> VEC_POS_OFFSET, prev)
+    }
+
+    #[inline]
+    fn uncoordinated_set_vec_pos(&mut self, pos: usize, prev: usize) {
+        // Once more... crazy
+        debug_assert!(pos <= MAX_VEC_POS);
+
+        unsafe {
+            let p: &mut AtomicPtr<Shared> = &mut self.arc;
+            let p: &mut usize = mem::transmute(p);
+            *p = (pos << VEC_POS_OFFSET) | (prev & NOT_VEC_POS_MASK);
+        }
+    }
+}
+
+fn rebuild_vec(ptr: *mut u8, mut len: usize, mut cap: usize, off: usize) -> Vec<u8> {
+    unsafe {
+        let ptr = ptr.offset(-(off as isize));
+        len += off;
+        cap += off;
+
+        Vec::from_raw_parts(ptr, len, cap)
+    }
 }
 
 impl Drop for Inner {
@@ -2176,10 +2299,10 @@ impl Drop for Inner {
         let kind = self.kind();
 
         if kind == KIND_VEC {
+            let (off, _) = self.uncoordinated_get_vec_pos();
+
             // Vector storage, free the vector
-            unsafe {
-                let _ = Vec::from_raw_parts(self.ptr, self.len, self.cap);
-            }
+            let _ = rebuild_vec(self.ptr, self.len, self.cap, off);
         } else if kind == KIND_ARC {
             release_shared(*self.arc.get_mut());
         }
@@ -2233,6 +2356,52 @@ impl Shared {
     }
 }
 
+fn original_capacity_to_repr(cap: usize) -> usize {
+    let width = PTR_WIDTH - ((cap >> MIN_ORIGINAL_CAPACITY_WIDTH).leading_zeros() as usize);
+    cmp::min(width, MAX_ORIGINAL_CAPACITY_WIDTH - MIN_ORIGINAL_CAPACITY_WIDTH)
+}
+
+fn original_capacity_from_repr(repr: usize) -> usize {
+    if repr == 0 {
+        return 0;
+    }
+
+    1 << (repr + (MIN_ORIGINAL_CAPACITY_WIDTH - 1))
+}
+
+#[test]
+fn test_original_capacity_to_repr() {
+    for &cap in &[0, 1, 16, 1000] {
+        assert_eq!(0, original_capacity_to_repr(cap));
+    }
+
+    for &cap in &[1024, 1025, 1100, 2000, 2047] {
+        assert_eq!(1, original_capacity_to_repr(cap));
+    }
+
+    for &cap in &[2048, 2049] {
+        assert_eq!(2, original_capacity_to_repr(cap));
+    }
+
+    // TODO: more
+
+    for &cap in &[65536, 65537, 68000, 1 << 17, 1 << 18, 1 << 20, 1 << 30] {
+        assert_eq!(7, original_capacity_to_repr(cap), "cap={}", cap);
+    }
+}
+
+#[test]
+fn test_original_capacity_from_repr() {
+    assert_eq!(0, original_capacity_from_repr(0));
+    assert_eq!(1024, original_capacity_from_repr(1));
+    assert_eq!(1024 * 2, original_capacity_from_repr(2));
+    assert_eq!(1024 * 4, original_capacity_from_repr(3));
+    assert_eq!(1024 * 8, original_capacity_from_repr(4));
+    assert_eq!(1024 * 16, original_capacity_from_repr(5));
+    assert_eq!(1024 * 32, original_capacity_from_repr(6));
+    assert_eq!(1024 * 64, original_capacity_from_repr(7));
+}
+
 unsafe impl Send for Inner {}
 unsafe impl Sync for Inner {}
 
diff --git a/tests/test_bytes.rs b/tests/test_bytes.rs
index 1c8ccd0..35ba8ac 100644
--- a/tests/test_bytes.rs
+++ b/tests/test_bytes.rs
@@ -350,16 +350,16 @@ fn reserve_growth() {
 
 #[test]
 fn reserve_allocates_at_least_original_capacity() {
-    let mut bytes = BytesMut::with_capacity(128);
+    let mut bytes = BytesMut::with_capacity(1024);
 
-    for i in 0..120 {
+    for i in 0..1020 {
         bytes.put(i as u8);
     }
 
     let _other = bytes.take();
 
     bytes.reserve(16);
-    assert_eq!(bytes.capacity(), 128);
+    assert_eq!(bytes.capacity(), 1024);
 }
 
 #[test]
@@ -466,6 +466,44 @@ fn from_static() {
     assert_eq!(b, b"b"[..]);
 }
 
+#[test]
+fn advance_inline() {
+    let mut a = Bytes::from(&b"hello world"[..]);
+    a.advance(6);
+    assert_eq!(a, &b"world"[..]);
+}
+
+#[test]
+fn advance_static() {
+    let mut a = Bytes::from_static(b"hello world");
+    a.advance(6);
+    assert_eq!(a, &b"world"[..]);
+}
+
+#[test]
+fn advance_vec() {
+    let mut a = BytesMut::from(b"hello world boooo yah world zomg wat wat".to_vec());
+    a.advance(16);
+    assert_eq!(a, b"o yah world zomg wat wat"[..]);
+
+    a.advance(4);
+    assert_eq!(a, b"h world zomg wat wat"[..]);
+
+    // Reserve some space.
+    a.reserve(1024);
+    assert_eq!(a, b"h world zomg wat wat"[..]);
+
+    a.advance(6);
+    assert_eq!(a, b"d zomg wat wat"[..]);
+}
+
+#[test]
+#[should_panic]
+fn advance_past_len() {
+    let mut a = BytesMut::from(b"hello world".to_vec());
+    a.advance(20);
+}
+
 #[test]
 // Only run these tests on little endian systems. CI uses qemu for testing
 // little endian... and qemu doesn't really support threading all that well.
-- 
GitLab