From 1c2234f7fea1ad481f6730049f03b6d3ffdc40e7 Mon Sep 17 00:00:00 2001 From: Carl Lerche <me@carllerche.com> Date: Wed, 20 Jul 2016 22:47:41 +0200 Subject: [PATCH] Initial stab at a buffer pool --- Cargo.toml | 1 + src/alloc/mod.rs | 1 + src/alloc/pool.rs | 253 ++++++++++++++++++++++++++++++++++++++++++++++ test/test.rs | 2 + test/test_pool.rs | 85 ++++++++++++++++ 5 files changed, 342 insertions(+) create mode 100644 src/alloc/pool.rs create mode 100644 test/test_pool.rs diff --git a/Cargo.toml b/Cargo.toml index 5f5c745..5913f43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ stable-heap = "0.1.0" [dev-dependencies] rand = "0.3.5" +byteorder = "0.5.3" [[bench]] diff --git a/src/alloc/mod.rs b/src/alloc/mod.rs index b597283..891412f 100644 --- a/src/alloc/mod.rs +++ b/src/alloc/mod.rs @@ -1,4 +1,5 @@ mod heap; +mod pool; pub use self::heap::Heap; pub use self::pool::Pool; diff --git a/src/alloc/pool.rs b/src/alloc/pool.rs new file mode 100644 index 0000000..8d746b0 --- /dev/null +++ b/src/alloc/pool.rs @@ -0,0 +1,253 @@ +use super::{Mem, MemRef}; +use buf::{ByteBuf, MutByteBuf}; +use stable_heap as heap; +use std::{mem, ptr, isize, usize}; +use std::cell::{Cell, UnsafeCell}; +use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; + +// TODO: ensure that not Sync +pub struct Pool { + inner: Arc<PoolInner>, + marker: PhantomData<Cell<()>>, +} + +struct PoolInner { + ptr: *mut u8, // Pointer to the raw memory + next: AtomicPtr<Entry>, + cap: usize, // Total number of entries + buf_len: usize, // Byte size of each byte buf + entry_len: usize, // Byte size of each entry +} + +struct Entry { + inner: UnsafeCell<Inner>, +} + +struct Inner { + pool: Option<Pool>, + refs: AtomicUsize, + next: *mut Entry, +} + +const MAX_REFCOUNT: usize = (isize::MAX) as usize; + +impl Pool { + /// Constructs a new `Pool` with with specified capacity such that each + /// buffer in the pool has a length of `buf_len`. + pub fn with_capacity(cap: usize, mut buf_len: usize) -> Pool { + // Ensure that all buffers have a power of 2 size. This enables + // optimizations in Buf implementations. + buf_len = buf_len.next_power_of_two(); + + let inner = Arc::new(PoolInner::with_capacity(cap, buf_len)); + + // Iterate each entry and initialize the memory + let mut next = ptr::null_mut(); + + for i in 0..cap { + unsafe { + let off = i * inner.entry_len; + let ptr = inner.ptr.offset(off as isize); + let e = &mut *(ptr as *mut Entry); + + ptr::write(&mut e.inner as *mut UnsafeCell<Inner>, UnsafeCell::new(Inner { + pool: None, + refs: AtomicUsize::new(0), + next: next, + })); + + next = ptr as *mut Entry; + + let ptr = ptr.offset(mem::size_of::<Entry>() as isize); + ptr::write(ptr as *mut &Mem, e as &Mem); + + let ptr = ptr.offset(mem::size_of::<&Mem>() as isize); + ptr::write(ptr as *mut usize, buf_len); + } + } + + // Set the next ptr to the head of the Entry linked list + inner.next.store(next, Ordering::Relaxed); + + Pool { + inner: inner, + marker: PhantomData, + } + } + + /// Returns the number of buffers that the `Pool` holds. + pub fn capacity(&self) -> usize { + self.inner.cap + } + + /// Returns a new `ByteBuf` backed by a buffer from the pool. If the pool + /// is depleted, `None` is returned. + pub fn new_byte_buf(&self) -> Option<MutByteBuf> { + let len = self.inner.buf_len as u32; + self.checkout().map(|mem| { + let buf = unsafe { ByteBuf::from_mem_ref(mem, len, 0, len) }; + buf.flip() + }) + } + + fn checkout(&self) -> Option<MemRef> { + unsafe { + let mut ptr = self.inner.next.load(Ordering::Acquire); + + loop { + if ptr.is_null() { + // The pool is depleted + return None; + } + + let inner = &*(*ptr).inner.get(); + + let next = inner.next; + + let res = self.inner.next.compare_and_swap(ptr, next, Ordering::AcqRel); + + if res == ptr { + break; + } + + ptr = res; + } + + let inner = &mut *(*ptr).inner.get(); + + // Unset next pointer & set the pool + inner.next = ptr::null_mut(); + inner.refs.store(1, Ordering::Relaxed); + inner.pool = Some(self.clone()); + + let ptr = ptr as *mut u8; + let ptr = ptr.offset(mem::size_of::<Entry>() as isize); + + Some(MemRef::new(ptr)) + } + } + + fn clone(&self) -> Pool { + Pool { + inner: self.inner.clone(), + marker: PhantomData, + } + } +} + +impl PoolInner { + fn with_capacity(cap: usize, buf_len: usize) -> PoolInner { + let ptr = unsafe { heap::allocate(alloc_len(cap, buf_len), align()) }; + + PoolInner { + ptr: ptr, + next: AtomicPtr::new(ptr::null_mut()), + cap: cap, + buf_len: buf_len, + entry_len: entry_len(buf_len), + } + } +} + +impl Drop for PoolInner { + fn drop(&mut self) { + unsafe { heap::deallocate(self.ptr, alloc_len(self.cap, self.buf_len), align()) } + } +} + +impl Entry { + fn release(&self) { + unsafe { + let inner = &mut *self.inner.get(); + let pool = inner.pool.take() + .expect("entry not associated with a pool"); + + let mut next = pool.inner.next.load(Ordering::Acquire); + + loop { + inner.next = next; + + let actual = pool.inner.next + .compare_and_swap(next, self as *const Entry as *mut Entry, Ordering::AcqRel); + + if actual == next { + break; + } + + next = actual; + } + } + } +} + +impl Mem for Entry { + fn ref_inc(&self) { + // Using a relaxed ordering is alright here, as knowledge of the + // original reference prevents other threads from erroneously deleting + // the object. + // + // As explained in the [Boost documentation][1], Increasing the + // reference counter can always be done with memory_order_relaxed: New + // references to an object can only be formed from an existing + // reference, and passing an existing reference from one thread to + // another must already provide any required synchronization. + // + // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) + let old_size = unsafe { + (*self.inner.get()).refs.fetch_add(1, Ordering::Relaxed) + }; + + // However we need to guard against massive refcounts in case someone + // is `mem::forget`ing Arcs. If we don't do this the count can overflow + // and users will use-after free. We racily saturate to `isize::MAX` on + // the assumption that there aren't ~2 billion threads incrementing + // the reference count at once. This branch will never be taken in + // any realistic program. + // + // We abort because such a program is incredibly degenerate, and we + // don't care to support it. + if old_size > MAX_REFCOUNT { + panic!("too many refs"); + } + } + + fn ref_dec(&self) { + unsafe { + let prev = (*self.inner.get()).refs.fetch_sub(1, Ordering::Release); + + if prev != 1 { + return; + } + } + + atomic::fence(Ordering::Acquire); + self.release(); + } +} + +// TODO: is there a better way to do this? +unsafe impl Send for Entry {} +unsafe impl Sync for Entry {} + +fn alloc_len(cap: usize, buf_len: usize) -> usize { + cap * entry_len(buf_len) +} + +fn entry_len(bytes_len: usize) -> usize { + let len = bytes_len + + mem::size_of::<Entry>() + + mem::size_of::<&Mem>() + + mem::size_of::<usize>(); + + if len & (align() - 1) == 0 { + len + } else { + (len & !align()) + align() + } +} + +fn align() -> usize { + mem::size_of::<usize>() +} diff --git a/test/test.rs b/test/test.rs index 77fcd57..9fc3f42 100644 --- a/test/test.rs +++ b/test/test.rs @@ -2,12 +2,14 @@ use rand::random; extern crate bytes; extern crate rand; +extern crate byteorder; mod test_buf; mod test_buf_fill; mod test_buf_take; mod test_byte_buf; mod test_bytes; +mod test_pool; mod test_ring; mod test_rope; mod test_seq_byte_str; diff --git a/test/test_pool.rs b/test/test_pool.rs new file mode 100644 index 0000000..37d7b10 --- /dev/null +++ b/test/test_pool.rs @@ -0,0 +1,85 @@ +use bytes::alloc::Pool; +use bytes::{Buf, MutBuf}; +use rand::{self, Rng}; +use byteorder::{ByteOrder, BigEndian}; + +#[test] +fn test_pool_of_zero_capacity() { + let pool = Pool::with_capacity(0, 0); + assert!(pool.new_byte_buf().is_none()); + + let pool = Pool::with_capacity(0, 1_024); + assert!(pool.new_byte_buf().is_none()); +} + +#[test] +fn test_pool_with_one_capacity() { + let pool = Pool::with_capacity(1, 1024); + + let mut buf = pool.new_byte_buf().unwrap(); + assert!(pool.new_byte_buf().is_none()); + + assert_eq!(1024, buf.remaining()); + + buf.write_slice(b"Hello World"); + let mut buf = buf.flip(); + + let mut dst = vec![]; + + buf.copy_to(&mut dst).unwrap(); + + assert_eq!(&dst[..], b"Hello World"); + + // return the buffer to the pool + drop(buf); + + let _ = pool.new_byte_buf().unwrap(); +} + +#[test] +fn test_pool_stress() { + let pool = Pool::with_capacity(100, 4); + let mut bufs = Vec::with_capacity(100); + let mut rng = rand::thread_rng(); + + let mut s = [0; 4]; + + for i in 0..50_000u32 { + let action: usize = rng.gen(); + + match action % 3 { + 0 if bufs.len() < 100 => { + let mut buf = pool.new_byte_buf().unwrap(); + BigEndian::write_u32(&mut s, i); + buf.write_slice(&s); + bufs.push((i, buf.flip())); + } + 1 if bufs.len() > 0 => { + // drop + let len = bufs.len(); + let _ = bufs.remove(rng.gen::<usize>() % len); + } + 2 if bufs.len() > 0 => { + // read + let len = bufs.len(); + let (i, mut buf) = bufs.remove(rng.gen::<usize>() % len); + buf.mark(); + buf.read_slice(&mut s); + buf.reset(); + let v = BigEndian::read_u32(&s); + assert_eq!(i, v); + bufs.push((i, buf)); + } + 3 if bufs.len() > 0 => { + // write data + let len = bufs.len(); + let (i, buf) = bufs.remove(rng.gen::<usize>() % len); + let mut buf = buf.flip(); + BigEndian::write_u32(&mut s, i); + buf.write_slice(&s); + bufs.push((i, buf.flip())); + } + _ => {} + } + } +} -- GitLab