use super::{Mem, MemRef}; use buf::{ByteBuf, MutByteBuf}; use stable_heap as heap; use std::{mem, ptr, isize, usize}; use std::cell::{Cell, UnsafeCell}; use std::marker::PhantomData; use std::sync::Arc; use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; // TODO: ensure that not Sync pub struct Pool { inner: Arc<PoolInner>, marker: PhantomData<Cell<()>>, } struct PoolInner { ptr: *mut u8, // Pointer to the raw memory next: AtomicPtr<Entry>, cap: usize, // Total number of entries buf_len: usize, // Byte size of each byte buf entry_len: usize, // Byte size of each entry } struct Entry { inner: UnsafeCell<Inner>, } struct Inner { pool: Option<Pool>, refs: AtomicUsize, next: *mut Entry, } const MAX_REFCOUNT: usize = (isize::MAX) as usize; impl Pool { /// Constructs a new `Pool` with with specified capacity such that each /// buffer in the pool has a length of `buf_len`. pub fn with_capacity(cap: usize, mut buf_len: usize) -> Pool { // Ensure that all buffers have a power of 2 size. This enables // optimizations in Buf implementations. buf_len = buf_len.next_power_of_two(); let inner = Arc::new(PoolInner::with_capacity(cap, buf_len)); // Iterate each entry and initialize the memory let mut next = ptr::null_mut(); for i in 0..cap { unsafe { let off = i * inner.entry_len; let ptr = inner.ptr.offset(off as isize); let e = &mut *(ptr as *mut Entry); ptr::write(&mut e.inner as *mut UnsafeCell<Inner>, UnsafeCell::new(Inner { pool: None, refs: AtomicUsize::new(0), next: next, })); next = ptr as *mut Entry; let ptr = ptr.offset(mem::size_of::<Entry>() as isize); ptr::write(ptr as *mut &Mem, e as &Mem); let ptr = ptr.offset(mem::size_of::<&Mem>() as isize); ptr::write(ptr as *mut usize, buf_len); } } // Set the next ptr to the head of the Entry linked list inner.next.store(next, Ordering::Relaxed); Pool { inner: inner, marker: PhantomData, } } /// Returns the number of buffers that the `Pool` holds. pub fn capacity(&self) -> usize { self.inner.cap } /// Returns a new `ByteBuf` backed by a buffer from the pool. If the pool /// is depleted, `None` is returned. pub fn new_byte_buf(&self) -> Option<MutByteBuf> { let len = self.inner.buf_len as u32; self.checkout().map(|mem| { let buf = unsafe { ByteBuf::from_mem_ref(mem, len, 0, len) }; buf.flip() }) } fn checkout(&self) -> Option<MemRef> { unsafe { let mut ptr = self.inner.next.load(Ordering::Acquire); loop { if ptr.is_null() { // The pool is depleted return None; } let inner = &*(*ptr).inner.get(); let next = inner.next; let res = self.inner.next.compare_and_swap(ptr, next, Ordering::AcqRel); if res == ptr { break; } ptr = res; } let inner = &mut *(*ptr).inner.get(); // Unset next pointer & set the pool inner.next = ptr::null_mut(); inner.refs.store(1, Ordering::Relaxed); inner.pool = Some(self.clone()); let ptr = ptr as *mut u8; let ptr = ptr.offset(mem::size_of::<Entry>() as isize); Some(MemRef::new(ptr)) } } fn clone(&self) -> Pool { Pool { inner: self.inner.clone(), marker: PhantomData, } } } impl PoolInner { fn with_capacity(cap: usize, buf_len: usize) -> PoolInner { let ptr = unsafe { heap::allocate(alloc_len(cap, buf_len), align()) }; PoolInner { ptr: ptr, next: AtomicPtr::new(ptr::null_mut()), cap: cap, buf_len: buf_len, entry_len: entry_len(buf_len), } } } impl Drop for PoolInner { fn drop(&mut self) { unsafe { heap::deallocate(self.ptr, alloc_len(self.cap, self.buf_len), align()) } } } impl Entry { fn release(&self) { unsafe { let inner = &mut *self.inner.get(); let pool = inner.pool.take() .expect("entry not associated with a pool"); let mut next = pool.inner.next.load(Ordering::Acquire); loop { inner.next = next; let actual = pool.inner.next .compare_and_swap(next, self as *const Entry as *mut Entry, Ordering::AcqRel); if actual == next { break; } next = actual; } } } } impl Mem for Entry { fn ref_inc(&self) { // Using a relaxed ordering is alright here, as knowledge of the // original reference prevents other threads from erroneously deleting // the object. // // As explained in the [Boost documentation][1], Increasing the // reference counter can always be done with memory_order_relaxed: New // references to an object can only be formed from an existing // reference, and passing an existing reference from one thread to // another must already provide any required synchronization. // // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) let old_size = unsafe { (*self.inner.get()).refs.fetch_add(1, Ordering::Relaxed) }; // However we need to guard against massive refcounts in case someone // is `mem::forget`ing Arcs. If we don't do this the count can overflow // and users will use-after free. We racily saturate to `isize::MAX` on // the assumption that there aren't ~2 billion threads incrementing // the reference count at once. This branch will never be taken in // any realistic program. // // We abort because such a program is incredibly degenerate, and we // don't care to support it. if old_size > MAX_REFCOUNT { panic!("too many refs"); } } fn ref_dec(&self) { unsafe { let prev = (*self.inner.get()).refs.fetch_sub(1, Ordering::Release); if prev != 1 { return; } } atomic::fence(Ordering::Acquire); self.release(); } } // TODO: is there a better way to do this? unsafe impl Send for Entry {} unsafe impl Sync for Entry {} fn alloc_len(cap: usize, buf_len: usize) -> usize { cap * entry_len(buf_len) } fn entry_len(bytes_len: usize) -> usize { let len = bytes_len + mem::size_of::<Entry>() + mem::size_of::<&Mem>() + mem::size_of::<usize>(); if len & (align() - 1) == 0 { len } else { (len & !align()) + align() } } fn align() -> usize { mem::size_of::<usize>() }