From 6a3d20bb8d7212b8c9a020417921e79bb32b0eee Mon Sep 17 00:00:00 2001 From: Stepan Koltsov <stepan.koltsov@gmail.com> Date: Wed, 3 Jan 2018 19:41:33 +0000 Subject: [PATCH] Optimize shallow_clone for Bytes::split_{off,to} (#92) If `shallow_clone` is called with `&mut self`, and `Bytes` contains `Vec`, then expensive CAS can be avoided, because no other thread have references to this `Bytes` object. Bench `split_off_and_drop` difference: Before the diff: ``` test split_off_and_drop ... bench: 91,858 ns/iter (+/- 17,401) ``` With the diff: ``` test split_off_and_drop ... bench: 81,162 ns/iter (+/- 17,603) ``` --- benches/bytes.rs | 12 ++++ src/bytes.rs | 162 +++++++++++++++++++++++++---------------------- 2 files changed, 97 insertions(+), 77 deletions(-) diff --git a/benches/bytes.rs b/benches/bytes.rs index 07494f5..d268db5 100644 --- a/benches/bytes.rs +++ b/benches/bytes.rs @@ -29,6 +29,18 @@ fn alloc_big(b: &mut Bencher) { }) } +#[bench] +fn split_off_and_drop(b: &mut Bencher) { + b.iter(|| { + for _ in 0..1024 { + let v = vec![10; 200]; + let mut b = Bytes::from(v); + test::black_box(b.split_off(100)); + test::black_box(b); + } + }) +} + #[bench] fn deref_unique(b: &mut Bencher) { let mut buf = BytesMut::with_capacity(4096); diff --git a/src/bytes.rs b/src/bytes.rs index 5dba49e..094d035 100644 --- a/src/bytes.rs +++ b/src/bytes.rs @@ -806,7 +806,7 @@ impl<'a> IntoBuf for &'a Bytes { impl Clone for Bytes { fn clone(&self) -> Bytes { Bytes { - inner: self.inner.shallow_clone(), + inner: unsafe { self.inner.shallow_clone(false) }, } } } @@ -1843,7 +1843,7 @@ impl Inner { } fn split_off(&mut self, at: usize) -> Inner { - let mut other = self.shallow_clone(); + let mut other = unsafe { self.shallow_clone(true) }; unsafe { other.set_start(at); @@ -1854,7 +1854,7 @@ impl Inner { } fn split_to(&mut self, at: usize) -> Inner { - let mut other = self.shallow_clone(); + let mut other = unsafe { self.shallow_clone(true) }; unsafe { other.set_end(at); @@ -1920,7 +1920,7 @@ impl Inner { // on 64 bit systems and will only happen on 32 bit systems // when shifting past 134,217,727 bytes. As such, we don't // worry too much about performance here. - let _ = self.shallow_clone(); + let _ = self.shallow_clone(true); } } @@ -1980,26 +1980,27 @@ impl Inner { /// Increments the ref count. This should only be done if it is known that /// it can be done safely. As such, this fn is not public, instead other /// fns will use this one while maintaining the guarantees. + /// Parameter `mut_self` should only be set to `true` if caller holds + /// `&mut self` reference. /// /// "Safely" is defined as not exposing two `BytesMut` values that point to /// the same byte window. /// /// This function is thread safe. - fn shallow_clone(&self) -> Inner { + unsafe fn shallow_clone(&self, mut_self: bool) -> Inner { // Always check `inline` first, because if the handle is using inline // data storage, all of the `Inner` struct fields will be gibberish. if self.is_inline() { // In this case, a shallow_clone still involves copying the data. - unsafe { - // TODO: Just copy the fields - let mut inner: Inner = mem::uninitialized(); - let len = self.inline_len(); + // + // TODO: Just copy the fields + let mut inner: Inner = mem::uninitialized(); + let len = self.inline_len(); - inner.arc = AtomicPtr::new(KIND_INLINE as *mut Shared); - inner.set_inline_len(len); - inner.as_raw()[0..len].copy_from_slice(self.as_ref()); - inner - } + inner.arc = AtomicPtr::new(KIND_INLINE as *mut Shared); + inner.set_inline_len(len); + inner.as_raw()[0..len].copy_from_slice(self.as_ref()); + inner } else { // The function requires `&self`, this means that `shallow_clone` // could be called concurrently. @@ -2018,63 +2019,71 @@ impl Inner { let original_capacity_repr = (arc as usize & ORIGINAL_CAPACITY_MASK) >> ORIGINAL_CAPACITY_OFFSET; - unsafe { - // The vec offset cannot be concurrently mutated, so there - // should be no danger reading it. - let off = (arc as usize) >> VEC_POS_OFFSET; - - // First, allocate a new `Shared` instance containing the - // `Vec` fields. It's important to note that `ptr`, `len`, - // and `cap` cannot be mutated without having `&mut self`. - // This means that these fields will not be concurrently - // updated and since the buffer hasn't been promoted to an - // `Arc`, those three fields still are the components of the - // vector. - let shared = Box::new(Shared { - vec: rebuild_vec(self.ptr, self.len, self.cap, off), - original_capacity_repr: original_capacity_repr, - // Initialize refcount to 2. One for this reference, and one - // for the new clone that will be returned from - // `shallow_clone`. - ref_count: AtomicUsize::new(2), - }); - - let shared = Box::into_raw(shared); - - // The pointer should be aligned, so this assert should - // always succeed. - debug_assert!(0 == (shared as usize & 0b11)); - - // Try compare & swapping the pointer into the `arc` field. - // `Release` is used synchronize with other threads that - // will load the `arc` field. - // - // If the `compare_and_swap` fails, then the thread lost the - // race to promote the buffer to shared. The `Acquire` - // ordering will synchronize with the `compare_and_swap` - // that happened in the other thread and the `Shared` - // pointed to by `actual` will be visible. - let actual = self.arc.compare_and_swap(arc, shared, AcqRel); - - if actual == arc { - // The upgrade was successful, the new handle can be - // returned. - return Inner { - arc: AtomicPtr::new(shared), - .. *self - }; - } - - // The upgrade failed, a concurrent clone happened. Release - // the allocation that was made in this thread, it will not - // be needed. - let shared = Box::from_raw(shared); - mem::forget(*shared); - - // Update the `arc` local variable and fall through to a ref - // count update - arc = actual; + // The vec offset cannot be concurrently mutated, so there + // should be no danger reading it. + let off = (arc as usize) >> VEC_POS_OFFSET; + + // First, allocate a new `Shared` instance containing the + // `Vec` fields. It's important to note that `ptr`, `len`, + // and `cap` cannot be mutated without having `&mut self`. + // This means that these fields will not be concurrently + // updated and since the buffer hasn't been promoted to an + // `Arc`, those three fields still are the components of the + // vector. + let shared = Box::new(Shared { + vec: rebuild_vec(self.ptr, self.len, self.cap, off), + original_capacity_repr: original_capacity_repr, + // Initialize refcount to 2. One for this reference, and one + // for the new clone that will be returned from + // `shallow_clone`. + ref_count: AtomicUsize::new(2), + }); + + let shared = Box::into_raw(shared); + + // The pointer should be aligned, so this assert should + // always succeed. + debug_assert!(0 == (shared as usize & 0b11)); + + // If there are no references to self in other threads, + // expensive atomic operations can be avoided. + if mut_self { + self.arc.store(shared, Relaxed); + return Inner { + arc: AtomicPtr::new(shared), + .. *self + }; } + + // Try compare & swapping the pointer into the `arc` field. + // `Release` is used synchronize with other threads that + // will load the `arc` field. + // + // If the `compare_and_swap` fails, then the thread lost the + // race to promote the buffer to shared. The `Acquire` + // ordering will synchronize with the `compare_and_swap` + // that happened in the other thread and the `Shared` + // pointed to by `actual` will be visible. + let actual = self.arc.compare_and_swap(arc, shared, AcqRel); + + if actual == arc { + // The upgrade was successful, the new handle can be + // returned. + return Inner { + arc: AtomicPtr::new(shared), + .. *self + }; + } + + // The upgrade failed, a concurrent clone happened. Release + // the allocation that was made in this thread, it will not + // be needed. + let shared = Box::from_raw(shared); + mem::forget(*shared); + + // Update the `arc` local variable and fall through to a ref + // count update + arc = actual; } else if arc as usize & KIND_MASK == KIND_STATIC { // Static buffer return Inner { @@ -2085,14 +2094,13 @@ impl Inner { // Buffer already promoted to shared storage, so increment ref // count. - unsafe { - // Relaxed ordering is acceptable as the memory has already been - // acquired via the `Acquire` load above. - let old_size = (*arc).ref_count.fetch_add(1, Relaxed); + // + // Relaxed ordering is acceptable as the memory has already been + // acquired via the `Acquire` load above. + let old_size = (*arc).ref_count.fetch_add(1, Relaxed); - if old_size == usize::MAX { - panic!(); // TODO: abort - } + if old_size == usize::MAX { + panic!(); // TODO: abort } Inner { -- GitLab