diff --git a/src/alloc/heap.rs b/src/alloc/heap.rs index 1bd3f70050d06e8cbce74896018359971f2d5f01..7dad574135f40cdc31cec48e7dbadf6c3b3945ce 100644 --- a/src/alloc/heap.rs +++ b/src/alloc/heap.rs @@ -1,111 +1,9 @@ -use alloc::{Mem, MemRef}; -use stable_heap as heap; -use std::{mem, ptr, isize, usize}; -use std::sync::atomic::{self, AtomicUsize, Ordering}; +use alloc::{MemRef}; +use std::sync::Arc; -const MAX_ALLOC_SIZE: usize = usize::MAX; -const MAX_REFCOUNT: usize = (isize::MAX) as usize; +pub unsafe fn allocate(len: usize) -> MemRef { + let mut v = Vec::with_capacity(len); + v.set_len(len); -/// Tracks a heap allocation and stores the atomic ref counter -struct Allocation { - refs: AtomicUsize, -} - -pub fn allocate(len: usize) -> MemRef { - // Make sure that the allocation is within the permitted range - if len > MAX_ALLOC_SIZE { - return MemRef::none(); - } - - unsafe { - let mut ptr = heap::allocate(alloc_len(len), align()); - let mut off = 0; - - ptr::write(ptr as *mut Allocation, Allocation::new()); - - off += mem::size_of::<Allocation>(); - ptr::write(ptr.offset(off as isize) as *mut &Mem, &*(ptr as *const Allocation)); - - off += mem::size_of::<&Mem>(); - ptr::write(ptr.offset(off as isize) as *mut usize, len); - - ptr = ptr.offset(mem::size_of::<Allocation>() as isize); - - MemRef::new(ptr) - } -} - -fn deallocate(ptr: *mut u8) { - unsafe { - let off = mem::size_of::<Allocation>() + mem::size_of::<&Mem>(); - let len = ptr::read(ptr.offset(off as isize) as *const usize); - - heap::deallocate(ptr, alloc_len(len), align()); - } -} - -impl Allocation { - fn new() -> Allocation { - Allocation { - refs: AtomicUsize::new(1), - } - } -} - -impl Mem for Allocation { - fn ref_inc(&self) { - // Using a relaxed ordering is alright here, as knowledge of the - // original reference prevents other threads from erroneously deleting - // the object. - // - // As explained in the [Boost documentation][1], Increasing the - // reference counter can always be done with memory_order_relaxed: New - // references to an object can only be formed from an existing - // reference, and passing an existing reference from one thread to - // another must already provide any required synchronization. - // - // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) - let old_size = self.refs.fetch_add(1, Ordering::Relaxed); - - // However we need to guard against massive refcounts in case someone - // is `mem::forget`ing Arcs. If we don't do this the count can overflow - // and users will use-after free. We racily saturate to `isize::MAX` on - // the assumption that there aren't ~2 billion threads incrementing - // the reference count at once. This branch will never be taken in - // any realistic program. - // - // We abort because such a program is incredibly degenerate, and we - // don't care to support it. - if old_size > MAX_REFCOUNT { - panic!("too many refs"); - } - } - - fn ref_dec(&self) { - if self.refs.fetch_sub(1, Ordering::Release) != 1 { - return; - } - - atomic::fence(Ordering::Acquire); - deallocate(self as *const Allocation as *const u8 as *mut u8); - } -} - -#[inline] -fn alloc_len(bytes_len: usize) -> usize { - let len = bytes_len + - mem::size_of::<Allocation>() + - mem::size_of::<&Mem>() + - mem::size_of::<usize>(); - - if len & (align() - 1) == 0 { - len - } else { - (len & !align()) + align() - } -} - -#[inline] -fn align() -> usize { - mem::size_of::<usize>() + MemRef::new(Arc::new(v)) } diff --git a/src/alloc/mod.rs b/src/alloc/mod.rs index e562930edb6a34caaeedb2d2692acd7eda09af07..ec8dbb2e51b30b23a21dd8287720e891b6e1f7ad 100644 --- a/src/alloc/mod.rs +++ b/src/alloc/mod.rs @@ -1,122 +1,58 @@ mod heap; -mod pool; -pub use self::pool::Pool; - -use std::{mem, ptr}; - -/// Ref-counted segment of memory -pub trait Mem: Send + Sync { - /// Increment the ref count - fn ref_inc(&self); - - /// Decrement the ref count - fn ref_dec(&self); -} +use std::sync::Arc; pub struct MemRef { - // Pointer to the memory - // Layout: - // - &Mem - // - usize (len) - // - u8... bytes - ptr: *mut u8, + mem: Arc<Vec<u8>>, } /// Allocate a segment of memory and return a `MemRef`. -pub fn heap(len: usize) -> MemRef { +pub unsafe fn heap(len: usize) -> MemRef { heap::allocate(len) } impl MemRef { #[inline] - pub unsafe fn new(ptr: *mut u8) -> MemRef { - MemRef { ptr: ptr } - } - - #[inline] - pub fn none() -> MemRef { - MemRef { ptr: ptr::null_mut() } - } - - #[inline] - pub fn is_none(&self) -> bool { - self.ptr.is_null() + pub unsafe fn new(mem: Arc<Vec<u8>>) -> MemRef { + MemRef { mem: mem } } #[inline] pub fn len(&self) -> usize { - unsafe { *self.len_ptr() } + self.mem.len() } #[inline] pub unsafe fn bytes(&self) -> &[u8] { - use std::slice; - slice::from_raw_parts(self.bytes_ptr(), self.len()) + &*self.mem } #[inline] pub unsafe fn bytes_slice(&self, start: usize, end: usize) -> &[u8] { use std::slice; - let ptr = self.bytes_ptr().offset(start as isize); + let ptr = self.mem.as_ptr().offset(start as isize); slice::from_raw_parts(ptr, end - start) } #[inline] pub unsafe fn mut_bytes(&mut self) -> &mut [u8] { use std::slice; - slice::from_raw_parts_mut(self.bytes_ptr(), self.len()) + let len = self.mem.len(); + slice::from_raw_parts_mut(self.mem.as_ptr() as *mut u8, len) } /// Unsafe, unchecked access to the bytes #[inline] pub unsafe fn mut_bytes_slice(&mut self, start: usize, end: usize) -> &mut [u8] { use std::slice; - let ptr = self.bytes_ptr().offset(start as isize); - slice::from_raw_parts_mut(ptr, end - start) - } - - #[inline] - fn mem(&self) -> &Mem { - unsafe { - *(self.ptr as *const &Mem) - } - } - - #[inline] - unsafe fn len_ptr(&self) -> *mut usize { - let off = mem::size_of::<&Mem>(); - self.ptr.offset(off as isize) as *mut usize - } - - #[inline] - unsafe fn bytes_ptr(&self) -> *mut u8 { - let off = mem::size_of::<&Mem>() + mem::size_of::<usize>(); - self.ptr.offset(off as isize) + let ptr = self.mem.as_ptr().offset(start as isize); + slice::from_raw_parts_mut(ptr as *mut u8, end - start) } } impl Clone for MemRef { #[inline] fn clone(&self) -> MemRef { - if self.is_none() { - return MemRef::none(); - } - - self.mem().ref_inc(); - MemRef { ptr: self.ptr } - } -} - -impl Drop for MemRef { - fn drop(&mut self) { - if self.is_none() { - return; - } - - self.mem().ref_dec(); + MemRef { mem: self.mem.clone() } } } - -unsafe impl Send for MemRef { } -unsafe impl Sync for MemRef { } diff --git a/src/buf/append.rs b/src/buf/append.rs index b706ae37034e4ffc37ff4c9e7070b7210a7c577e..2da7a822b5af93f25432a662dc7536f1056a39a8 100644 --- a/src/buf/append.rs +++ b/src/buf/append.rs @@ -16,32 +16,14 @@ pub struct AppendBuf { impl AppendBuf { pub fn with_capacity(mut capacity: u32) -> AppendBuf { - // Handle 0 capacity case - if capacity == 0 { - return AppendBuf::none(); - } - // Round the capacity to the closest power of 2 capacity = capacity.next_power_of_two(); - // Allocate the memory - let mem = alloc::heap(capacity as usize); - - // If the allocation failed, return a blank buf - if mem.is_none() { - return AppendBuf::none(); - } - - unsafe { AppendBuf::from_mem_ref(mem, capacity, 0) } - } + unsafe { + // Allocate the memory + let mem = alloc::heap(capacity as usize); - /// Returns an AppendBuf with no capacity - pub fn none() -> AppendBuf { - AppendBuf { - mem: alloc::MemRef::none(), - rd: Cell::new(0), - wr: 0, - cap: 0, + AppendBuf::from_mem_ref(mem, capacity, 0) } } @@ -73,6 +55,7 @@ impl AppendBuf { pub fn shift(&self, n: usize) -> Bytes { let ret = self.slice(0, n); self.rd.set(self.rd.get() + ret.len() as u32); + assert!(self.rd.get() <= self.wr, "buffer overflow"); ret } @@ -82,13 +65,15 @@ impl AppendBuf { } pub fn slice(&self, begin: usize, end: usize) -> Bytes { - let rd = self.rd.get() as usize; - let wr = self.wr as usize; + // TODO: Fix overflow potential + + let rd = self.rd.get(); + let wr = self.wr; - assert!(begin <= end && end <= wr - rd, "invalid range"); + let begin = begin as u32 + rd; + let end = end as u32 + rd; - let begin = (begin + rd) as u32; - let end = (end + rd) as u32; + assert!(begin <= end && end <= wr, "invalid range"); unsafe { Bytes::from_mem_ref(self.mem.clone(), begin, end - begin) } } @@ -111,7 +96,7 @@ impl MutBuf for AppendBuf { self.wr += cnt as u32; if self.wr > self.cap { - self.wr = self.cap; + panic!("buffer overflow"); } } diff --git a/src/buf/block.rs b/src/buf/block.rs index ff5edb4510916c4fb88ff503e8ff7fd5f42bab82..086c90e530c6b3fb2d251b4c075dce4526e54fe7 100644 --- a/src/buf/block.rs +++ b/src/buf/block.rs @@ -1,7 +1,7 @@ #![allow(warnings)] use {Buf, MutBuf, AppendBuf, Bytes}; -use alloc::{self, Pool}; +use alloc::{self, /* Pool */}; use std::{cmp, ptr, slice}; use std::io::Cursor; use std::rc::Rc; @@ -19,9 +19,9 @@ pub struct BlockBuf { new_block: NewBlock, } -pub enum NewBlock { +enum NewBlock { Heap(usize), - Pool(Rc<Pool>), + // Pool(Rc<Pool>), } pub struct BlockBufCursor<'a> { @@ -37,9 +37,11 @@ pub struct BlockBufCursor<'a> { // impl BlockBuf { /// Create BlockBuf - pub fn new(max_blocks: usize, new_block: NewBlock) -> BlockBuf { + pub fn new(max_blocks: usize, block_size: usize) -> BlockBuf { assert!(max_blocks > 1, "at least 2 blocks required"); + let new_block = NewBlock::Heap(block_size); + BlockBuf { len: 0, cap: max_blocks * new_block.block_size(), @@ -51,7 +53,7 @@ impl BlockBuf { /// Returns the number of buffered bytes #[inline] pub fn len(&self) -> usize { - debug_assert!(self.len == self.blocks.iter().map(|b| b.len()).fold(0, |a, b| a+b)); + debug_assert_eq!(self.len, self.blocks.iter().map(|b| b.len()).fold(0, |a, b| a+b)); self.len } @@ -83,9 +85,39 @@ impl BlockBuf { /// # Panics /// /// Panics if `n` is greater than the number of buffered bytes. - pub fn shift(&mut self, mut n: usize) -> Bytes { + #[inline] + pub fn shift(&mut self, n: usize) -> Bytes { trace!("BlockBuf::shift; n={}", n); + // Fast path + match self.blocks.len() { + 0 => { + assert!(n == 0, "buffer overflow"); + Bytes::empty() + } + 1 => { + let (ret, pop) = { + let block = self.blocks.front().expect("unexpected state"); + + let ret = block.shift(n); + self.len -= n; + + (ret, self.len == 0 && !MutBuf::has_remaining(block)) + }; + + if pop { + let _ = self.blocks.pop_front(); + } + + ret + } + _ => { + self.shift_multi(n) + } + } + } + + fn shift_multi(&mut self, mut n: usize) -> Bytes { let mut ret: Option<Bytes> = None; while n > 0 { @@ -96,11 +128,15 @@ impl BlockBuf { let (segment, pop) = { let block = self.blocks.front().expect("unexpected state"); - let segment_n = cmp::min(n, block.len()); + + let block_len = block.len(); + let segment_n = cmp::min(n, block_len); n -= segment_n; self.len -= segment_n; - (block.shift(segment_n), !MutBuf::has_remaining(block)) + let pop = block_len == segment_n && !MutBuf::has_remaining(block); + + (block.shift(segment_n), pop) }; if pop { @@ -108,13 +144,15 @@ impl BlockBuf { } ret = Some(match ret.take() { - Some(curr) => curr.concat(&segment), + Some(curr) => { + curr.concat(&segment) + } None => segment, }); } - ret.unwrap_or(Bytes::empty()) + ret.unwrap_or_else(|| Bytes::empty()) } /// Drop the first `n` buffered bytes @@ -146,6 +184,10 @@ impl BlockBuf { } } + pub fn is_compact(&mut self) -> bool { + self.blocks.len() <= 1 + } + /// Moves all buffered bytes into a single block. /// /// # Panics @@ -208,6 +250,19 @@ impl BlockBuf { fn have_buffered_data(&self) -> bool { self.len() > 0 } + + #[inline] + fn needs_alloc(&self) -> bool { + if let Some(buf) = self.blocks.back() { + // `unallocated_blocks` is checked here because if further blocks + // cannot be allocated, an empty slice should be returned. + if MutBuf::has_remaining(buf) { + return false; + } + } + + true + } } impl MutBuf for BlockBuf { @@ -236,18 +291,9 @@ impl MutBuf for BlockBuf { } } + #[inline] unsafe fn mut_bytes(&mut self) -> &mut [u8] { - let mut need_alloc = true; - - if let Some(buf) = self.blocks.back() { - // `unallocated_blocks` is checked here because if further blocks - // cannot be allocated, an empty slice should be returned. - if MutBuf::has_remaining(buf) { - need_alloc = false - } - } - - if need_alloc { + if self.needs_alloc() { if self.blocks.len() != self.blocks.capacity() { self.allocate_block() } @@ -261,7 +307,7 @@ impl MutBuf for BlockBuf { impl Default for BlockBuf { fn default() -> BlockBuf { - BlockBuf::new(16, NewBlock::Heap(8_192)) + BlockBuf::new(16, 8_192) } } @@ -307,7 +353,7 @@ impl NewBlock { fn block_size(&self) -> usize { match *self { NewBlock::Heap(size) => size, - NewBlock::Pool(ref pool) => pool.buffer_len(), + // NewBlock::Pool(ref pool) => pool.buffer_len(), } } @@ -315,7 +361,7 @@ impl NewBlock { fn new_block(&self) -> Option<AppendBuf> { match *self { NewBlock::Heap(size) => Some(AppendBuf::with_capacity(size as u32)), - NewBlock::Pool(ref pool) => pool.new_append_buf(), + // NewBlock::Pool(ref pool) => pool.new_append_buf(), } } } diff --git a/src/buf/byte.rs b/src/buf/byte.rs index 543b0d7aba8e831aa0f253273e0cda135c3cc898..1331e789a2aaf817490543e59c61db4f0dcafaf8 100644 --- a/src/buf/byte.rs +++ b/src/buf/byte.rs @@ -32,16 +32,6 @@ impl ByteBuf { MutByteBuf { buf: ByteBuf::new(capacity as u32) } } - pub fn none() -> ByteBuf { - ByteBuf { - mem: alloc::MemRef::none(), - cap: 0, - pos: 0, - lim: 0, - mark: None, - } - } - pub unsafe fn from_mem_ref(mem: alloc::MemRef, cap: u32, pos: u32, lim: u32) -> ByteBuf { debug_assert!(pos <= lim && lim <= cap, "invalid arguments; cap={}; pos={}; lim={}", cap, pos, lim); @@ -55,28 +45,20 @@ impl ByteBuf { } fn new(mut capacity: u32) -> ByteBuf { - // Handle 0 capacity case - if capacity == 0 { - return ByteBuf::none(); - } - // Round the capacity to the closest power of 2 capacity = capacity.next_power_of_two(); - // Allocate the memory - let mem = alloc::heap(capacity as usize); - - // If the allocation failed, return a blank buf - if mem.is_none() { - return ByteBuf::none(); - } - - ByteBuf { - mem: mem, - cap: capacity, - pos: 0, - lim: capacity, - mark: None, + unsafe { + // Allocate the memory + let mem = alloc::heap(capacity as usize); + + ByteBuf { + mem: mem, + cap: capacity, + pos: 0, + lim: capacity, + mark: None, + } } } diff --git a/src/buf/ring.rs b/src/buf/ring.rs index 2deec29178509c258fec21a33778f1fed702bf07..3c2cac420f3f21dfa79d10e523784f12caffa54d 100644 --- a/src/buf/ring.rs +++ b/src/buf/ring.rs @@ -23,28 +23,19 @@ pub struct RingBuf { impl RingBuf { /// Allocates a new `RingBuf` with the specified capacity. pub fn new(mut capacity: usize) -> RingBuf { - // Handle the 0 length buffer case - if capacity == 0 { - return RingBuf { - ptr: alloc::MemRef::none(), - cap: 0, - pos: 0, - len: 0, - mark: Mark::NoMark, - } - } - // Round to the next power of 2 for better alignment capacity = capacity.next_power_of_two(); - let mem = alloc::heap(capacity as usize); + unsafe { + let mem = alloc::heap(capacity as usize); - RingBuf { - ptr: mem, - cap: capacity, - pos: 0, - len: 0, - mark: Mark::NoMark, + RingBuf { + ptr: mem, + cap: capacity, + pos: 0, + len: 0, + mark: Mark::NoMark, + } } } diff --git a/src/bytes/mod.rs b/src/bytes/mod.rs index 30daabed7036b6143a32c8b38e79e57765791e2a..fd6bdcbf42665fffe6d177b2c73629f22a4e053f 100644 --- a/src/bytes/mod.rs +++ b/src/bytes/mod.rs @@ -9,6 +9,7 @@ use self::small::Small; use self::rope::{Rope, RopeBuf}; use std::{cmp, fmt, ops}; use std::io::Cursor; +use std::sync::Arc; #[derive(Clone)] pub struct Bytes { @@ -19,7 +20,7 @@ pub struct Bytes { enum Kind { Seq(Seq), Small(Small), - Rope(Rope), + Rope(Arc<Rope>), } pub struct BytesBuf<'a> { @@ -41,8 +42,9 @@ impl Bytes { /// /// This function is unsafe as there are no guarantees that the given /// arguments are valid. + #[inline] pub unsafe fn from_mem_ref(mem: alloc::MemRef, pos: u32, len: u32) -> Bytes { - Small::from_slice(&mem.bytes()[pos as usize .. pos as usize + len as usize]) + Small::from_slice(&mem.bytes_slice(pos as usize, pos as usize + len as usize)) .map(|b| Bytes { kind: Kind::Small(b) }) .unwrap_or_else(|| { let seq = Seq::from_mem_ref(mem, pos, len); @@ -110,7 +112,7 @@ impl Bytes { } } - fn into_rope(self) -> Result<Rope, Bytes> { + fn into_rope(self) -> Result<Arc<Rope>, Bytes> { match self.kind { Kind::Rope(r) => Ok(r), _ => Err(self), diff --git a/src/bytes/rope.rs b/src/bytes/rope.rs index 9688dd1819f8dd8478355a55e16bf00a1e844777..4d9a20e1680cd593a4d404ece677ed45e11632da 100644 --- a/src/bytes/rope.rs +++ b/src/bytes/rope.rs @@ -151,7 +151,7 @@ impl Rope { // the same as the given left tree. let new_right = concat_bytes(&left.right, &right, len); - return Rope::new(left.left, new_right).into_bytes(); + return Rope::new(left.left.clone(), new_right).into_bytes(); } if left.left.depth() > left.right.depth() && left.depth > right.depth() { @@ -161,12 +161,12 @@ impl Rope { // the the node on the RHS. This is yet another optimization // for building the string by repeatedly concatenating on the // right. - let new_right = Rope::new(left.right, right); + let new_right = Rope::new(left.right.clone(), right); - return Rope::new(left.left, new_right).into_bytes(); + return Rope::new(left.left.clone(), new_right).into_bytes(); } - left.into_bytes() + Bytes { kind: super::Kind::Rope(left) } } Err(left) => left, }; @@ -234,7 +234,7 @@ impl Rope { fn into_bytes(self) -> Bytes { use super::Kind; - Bytes { kind: Kind::Rope(self) } + Bytes { kind: Kind::Rope(Arc::new(self)) } } } @@ -302,7 +302,7 @@ impl From<Bytes> for Node { match src.kind { Kind::Seq(b) => Node::Seq(b), Kind::Small(b) => Node::Small(b), - Kind::Rope(b) => Node::Rope(Arc::new(b)), + Kind::Rope(b) => Node::Rope(b), } } } @@ -619,15 +619,15 @@ impl Partial { } fn unwrap_rope(self) -> Rope { - match self { + let arc = match self { Partial::Bytes(v) => v.into_rope().ok().expect("unexpected state calling `Partial::unwrap_rope()`"), - Partial::Node(Node::Rope(v)) => { - match Arc::try_unwrap(v) { - Ok(v) => v, - Err(v) => (*v).clone(), - } - } + Partial::Node(Node::Rope(v)) => v, _ => panic!("unexpected state calling `Partial::unwrap_rope()`"), + }; + + match Arc::try_unwrap(arc) { + Ok(v) => v, + Err(v) => (*v).clone(), } } } diff --git a/src/lib.rs b/src/lib.rs index 460533f0855b5039c280a27781a18a42f1e63f0a..af42d4ac9e722f1fb071dc585806a27e4ae568f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,7 @@ pub mod alloc; pub use buf::{Buf, MutBuf, Source, Sink, ReadExt, WriteExt, Fmt}; pub use buf::append::AppendBuf; -pub use buf::block::{BlockBuf, NewBlock, BlockBufCursor}; +pub use buf::block::{BlockBuf, BlockBufCursor}; pub use buf::byte::{ByteBuf, MutByteBuf}; pub use buf::ring::RingBuf; pub use buf::take::Take; diff --git a/test/test.rs b/test/test.rs index 76624ebcbb604f037eebaf9fe31abfd3058aa17a..c2c078e0111d078bc5755fbcc74548dec49e12e0 100644 --- a/test/test.rs +++ b/test/test.rs @@ -18,7 +18,7 @@ mod test_seq; mod test_small; // == Pool -mod test_pool; +// mod test_pool; fn gen_bytes(n: usize) -> Vec<u8> { (0..n).map(|_| random()).collect() diff --git a/test/test_append.rs b/test/test_append.rs index 1b7d5887fa9362a21de9f00db5b5a9df9d295a6f..3e537db55cc05378c937765136886f77993f9860 100644 --- a/test/test_append.rs +++ b/test/test_append.rs @@ -1,5 +1,4 @@ use bytes::{Buf, MutBuf, AppendBuf}; -use bytes::alloc::Pool; #[test] pub fn test_initial_buf_empty() { @@ -29,8 +28,10 @@ pub fn test_initial_buf_empty() { } } +/* #[test] pub fn test_append_buf_from_pool() { + use bytes::alloc::Pool; let pool = Pool::with_capacity(2, 256); // Run in a loop a bunch in hope that if there is a memory issue, it will @@ -58,3 +59,4 @@ pub fn test_append_buf_from_pool() { assert_eq!(dst, b"hello world"); } } +*/