Skip to content
Snippets Groups Projects
Commit 1c2234f7 authored by Carl Lerche's avatar Carl Lerche
Browse files

Initial stab at a buffer pool

parent dc25c756
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,7 @@ stable-heap = "0.1.0" ...@@ -23,6 +23,7 @@ stable-heap = "0.1.0"
[dev-dependencies] [dev-dependencies]
rand = "0.3.5" rand = "0.3.5"
byteorder = "0.5.3"
[[bench]] [[bench]]
......
mod heap; mod heap;
mod pool;
pub use self::heap::Heap; pub use self::heap::Heap;
pub use self::pool::Pool; pub use self::pool::Pool;
......
use super::{Mem, MemRef};
use buf::{ByteBuf, MutByteBuf};
use stable_heap as heap;
use std::{mem, ptr, isize, usize};
use std::cell::{Cell, UnsafeCell};
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
// TODO: ensure that not Sync
pub struct Pool {
inner: Arc<PoolInner>,
marker: PhantomData<Cell<()>>,
}
struct PoolInner {
ptr: *mut u8, // Pointer to the raw memory
next: AtomicPtr<Entry>,
cap: usize, // Total number of entries
buf_len: usize, // Byte size of each byte buf
entry_len: usize, // Byte size of each entry
}
struct Entry {
inner: UnsafeCell<Inner>,
}
struct Inner {
pool: Option<Pool>,
refs: AtomicUsize,
next: *mut Entry,
}
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
impl Pool {
/// Constructs a new `Pool` with with specified capacity such that each
/// buffer in the pool has a length of `buf_len`.
pub fn with_capacity(cap: usize, mut buf_len: usize) -> Pool {
// Ensure that all buffers have a power of 2 size. This enables
// optimizations in Buf implementations.
buf_len = buf_len.next_power_of_two();
let inner = Arc::new(PoolInner::with_capacity(cap, buf_len));
// Iterate each entry and initialize the memory
let mut next = ptr::null_mut();
for i in 0..cap {
unsafe {
let off = i * inner.entry_len;
let ptr = inner.ptr.offset(off as isize);
let e = &mut *(ptr as *mut Entry);
ptr::write(&mut e.inner as *mut UnsafeCell<Inner>, UnsafeCell::new(Inner {
pool: None,
refs: AtomicUsize::new(0),
next: next,
}));
next = ptr as *mut Entry;
let ptr = ptr.offset(mem::size_of::<Entry>() as isize);
ptr::write(ptr as *mut &Mem, e as &Mem);
let ptr = ptr.offset(mem::size_of::<&Mem>() as isize);
ptr::write(ptr as *mut usize, buf_len);
}
}
// Set the next ptr to the head of the Entry linked list
inner.next.store(next, Ordering::Relaxed);
Pool {
inner: inner,
marker: PhantomData,
}
}
/// Returns the number of buffers that the `Pool` holds.
pub fn capacity(&self) -> usize {
self.inner.cap
}
/// Returns a new `ByteBuf` backed by a buffer from the pool. If the pool
/// is depleted, `None` is returned.
pub fn new_byte_buf(&self) -> Option<MutByteBuf> {
let len = self.inner.buf_len as u32;
self.checkout().map(|mem| {
let buf = unsafe { ByteBuf::from_mem_ref(mem, len, 0, len) };
buf.flip()
})
}
fn checkout(&self) -> Option<MemRef> {
unsafe {
let mut ptr = self.inner.next.load(Ordering::Acquire);
loop {
if ptr.is_null() {
// The pool is depleted
return None;
}
let inner = &*(*ptr).inner.get();
let next = inner.next;
let res = self.inner.next.compare_and_swap(ptr, next, Ordering::AcqRel);
if res == ptr {
break;
}
ptr = res;
}
let inner = &mut *(*ptr).inner.get();
// Unset next pointer & set the pool
inner.next = ptr::null_mut();
inner.refs.store(1, Ordering::Relaxed);
inner.pool = Some(self.clone());
let ptr = ptr as *mut u8;
let ptr = ptr.offset(mem::size_of::<Entry>() as isize);
Some(MemRef::new(ptr))
}
}
fn clone(&self) -> Pool {
Pool {
inner: self.inner.clone(),
marker: PhantomData,
}
}
}
impl PoolInner {
fn with_capacity(cap: usize, buf_len: usize) -> PoolInner {
let ptr = unsafe { heap::allocate(alloc_len(cap, buf_len), align()) };
PoolInner {
ptr: ptr,
next: AtomicPtr::new(ptr::null_mut()),
cap: cap,
buf_len: buf_len,
entry_len: entry_len(buf_len),
}
}
}
impl Drop for PoolInner {
fn drop(&mut self) {
unsafe { heap::deallocate(self.ptr, alloc_len(self.cap, self.buf_len), align()) }
}
}
impl Entry {
fn release(&self) {
unsafe {
let inner = &mut *self.inner.get();
let pool = inner.pool.take()
.expect("entry not associated with a pool");
let mut next = pool.inner.next.load(Ordering::Acquire);
loop {
inner.next = next;
let actual = pool.inner.next
.compare_and_swap(next, self as *const Entry as *mut Entry, Ordering::AcqRel);
if actual == next {
break;
}
next = actual;
}
}
}
}
impl Mem for Entry {
fn ref_inc(&self) {
// Using a relaxed ordering is alright here, as knowledge of the
// original reference prevents other threads from erroneously deleting
// the object.
//
// As explained in the [Boost documentation][1], Increasing the
// reference counter can always be done with memory_order_relaxed: New
// references to an object can only be formed from an existing
// reference, and passing an existing reference from one thread to
// another must already provide any required synchronization.
//
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
let old_size = unsafe {
(*self.inner.get()).refs.fetch_add(1, Ordering::Relaxed)
};
// However we need to guard against massive refcounts in case someone
// is `mem::forget`ing Arcs. If we don't do this the count can overflow
// and users will use-after free. We racily saturate to `isize::MAX` on
// the assumption that there aren't ~2 billion threads incrementing
// the reference count at once. This branch will never be taken in
// any realistic program.
//
// We abort because such a program is incredibly degenerate, and we
// don't care to support it.
if old_size > MAX_REFCOUNT {
panic!("too many refs");
}
}
fn ref_dec(&self) {
unsafe {
let prev = (*self.inner.get()).refs.fetch_sub(1, Ordering::Release);
if prev != 1 {
return;
}
}
atomic::fence(Ordering::Acquire);
self.release();
}
}
// TODO: is there a better way to do this?
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}
fn alloc_len(cap: usize, buf_len: usize) -> usize {
cap * entry_len(buf_len)
}
fn entry_len(bytes_len: usize) -> usize {
let len = bytes_len +
mem::size_of::<Entry>() +
mem::size_of::<&Mem>() +
mem::size_of::<usize>();
if len & (align() - 1) == 0 {
len
} else {
(len & !align()) + align()
}
}
fn align() -> usize {
mem::size_of::<usize>()
}
...@@ -2,12 +2,14 @@ use rand::random; ...@@ -2,12 +2,14 @@ use rand::random;
extern crate bytes; extern crate bytes;
extern crate rand; extern crate rand;
extern crate byteorder;
mod test_buf; mod test_buf;
mod test_buf_fill; mod test_buf_fill;
mod test_buf_take; mod test_buf_take;
mod test_byte_buf; mod test_byte_buf;
mod test_bytes; mod test_bytes;
mod test_pool;
mod test_ring; mod test_ring;
mod test_rope; mod test_rope;
mod test_seq_byte_str; mod test_seq_byte_str;
......
use bytes::alloc::Pool;
use bytes::{Buf, MutBuf};
use rand::{self, Rng};
use byteorder::{ByteOrder, BigEndian};
#[test]
fn test_pool_of_zero_capacity() {
let pool = Pool::with_capacity(0, 0);
assert!(pool.new_byte_buf().is_none());
let pool = Pool::with_capacity(0, 1_024);
assert!(pool.new_byte_buf().is_none());
}
#[test]
fn test_pool_with_one_capacity() {
let pool = Pool::with_capacity(1, 1024);
let mut buf = pool.new_byte_buf().unwrap();
assert!(pool.new_byte_buf().is_none());
assert_eq!(1024, buf.remaining());
buf.write_slice(b"Hello World");
let mut buf = buf.flip();
let mut dst = vec![];
buf.copy_to(&mut dst).unwrap();
assert_eq!(&dst[..], b"Hello World");
// return the buffer to the pool
drop(buf);
let _ = pool.new_byte_buf().unwrap();
}
#[test]
fn test_pool_stress() {
let pool = Pool::with_capacity(100, 4);
let mut bufs = Vec::with_capacity(100);
let mut rng = rand::thread_rng();
let mut s = [0; 4];
for i in 0..50_000u32 {
let action: usize = rng.gen();
match action % 3 {
0 if bufs.len() < 100 => {
let mut buf = pool.new_byte_buf().unwrap();
BigEndian::write_u32(&mut s, i);
buf.write_slice(&s);
bufs.push((i, buf.flip()));
}
1 if bufs.len() > 0 => {
// drop
let len = bufs.len();
let _ = bufs.remove(rng.gen::<usize>() % len);
}
2 if bufs.len() > 0 => {
// read
let len = bufs.len();
let (i, mut buf) = bufs.remove(rng.gen::<usize>() % len);
buf.mark();
buf.read_slice(&mut s);
buf.reset();
let v = BigEndian::read_u32(&s);
assert_eq!(i, v);
bufs.push((i, buf));
}
3 if bufs.len() > 0 => {
// write data
let len = bufs.len();
let (i, buf) = bufs.remove(rng.gen::<usize>() % len);
let mut buf = buf.flip();
BigEndian::write_u32(&mut s, i);
buf.write_slice(&s);
bufs.push((i, buf.flip()));
}
_ => {}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment