diff --git a/src/imp/buf/ring.rs b/src/imp/buf/ring.rs index 4b80cf2f26a156e9586d71e945e2f9bb161561e4..6e6630ccffe4dec422af7aeca73f14bdb3060dd6 100644 --- a/src/imp/buf/ring.rs +++ b/src/imp/buf/ring.rs @@ -1,187 +1,158 @@ -use {alloc, Buf, MutBuf}; -use std::{cmp, fmt}; +use {Buf, MutBuf}; +use imp::alloc; +use std::fmt; -enum Mark { - NoMark, - At { pos: usize, len: usize }, -} -/// Buf backed by a continous chunk of memory. Maintains a read cursor and a -/// write cursor. When reads and writes reach the end of the allocated buffer, -/// wraps around to the start. +/// `RingBuf` is backed by contiguous memory and writes may wrap. /// -/// This type is suited for use cases where reads and writes are intermixed. -pub struct RingBuf { - ptr: alloc::MemRef, // Pointer to the memory - cap: usize, // Capacity of the buffer - pos: usize, // Offset of read cursor - len: usize, // Number of bytes to read - mark: Mark, // Marked read position +/// When writing reaches the end of the memory, writing resume at the beginning +/// of the memory. Writes may never overwrite pending reads. +pub struct RingBuf<T = Box<[u8]>> { + // Contiguous memory + mem: T, + // Current read position + rd: u64, + // Current write position + wr: u64, + // Mask used to convert the cursor to an offset + mask: u64, } -// TODO: There are most likely many optimizations that can be made impl RingBuf { /// Allocates a new `RingBuf` with the specified capacity. - pub fn with_capacity(mut capacity: usize) -> RingBuf { - // Round to the next power of 2 for better alignment - capacity = capacity.next_power_of_two(); - - unsafe { - let mem = alloc::heap(capacity as usize); - - RingBuf { - ptr: mem, - cap: capacity, - pos: 0, - len: 0, - mark: Mark::NoMark, - } - } - } - - /// Returns `true` if the buf cannot accept any further writes. - pub fn is_full(&self) -> bool { - self.cap == self.len + pub fn with_capacity(capacity: usize) -> RingBuf { + let mem = unsafe { alloc::with_capacity(capacity) }; + RingBuf::new(mem) } +} - /// Returns `true` if the buf cannot accept any further reads. - pub fn is_empty(&self) -> bool { - self.len == 0 +impl<T: AsRef<[u8]>> RingBuf<T> { + /// Creates a new `RingBuf` wrapping the provided slice + pub fn new(mem: T) -> RingBuf<T> { + // Ensure that the memory chunk provided has a length that is a power + // of 2 + let len = mem.as_ref().len() as u64; + let mask = len - 1; + + assert!(len & mask == 0, "mem length must be power of two"); + + RingBuf { + mem: mem, + rd: 0, + wr: 0, + mask: mask, + } } /// Returns the number of bytes that the buf can hold. pub fn capacity(&self) -> usize { - self.cap - } - - /// Marks the current read location. - /// - /// Together with `reset`, this can be used to read from a section of the - /// buffer multiple times. The mark will be cleared if it is overwritten - /// during a write. - pub fn mark(&mut self) { - self.mark = Mark::At { pos: self.pos, len: self.len }; - } - - /// Resets the read position to the previously marked position. - /// - /// Together with `mark`, this can be used to read from a section of the - /// buffer multiple times. - /// - /// # Panics - /// - /// This method will panic if no mark has been set, - pub fn reset(&mut self){ - match self.mark { - Mark::NoMark => panic!("no mark set"), - Mark::At {pos, len} => { - self.pos = pos; - self.len = len; - self.mark = Mark::NoMark; - } - } + self.mem.as_ref().len() } - /// Resets all internal state to the initial state. - pub fn clear(&mut self) { - self.pos = 0; - self.len = 0; - self.mark = Mark::NoMark; + /// Return the read cursor position + pub fn position(&self) -> u64 { + self.rd } - /// Returns the number of bytes remaining to read. - fn read_remaining(&self) -> usize { - self.len + /// Set the read cursor position + pub fn set_position(&mut self, position: u64) { + assert!(position <= self.wr && position + self.capacity() as u64 >= self.wr, + "position out of bounds"); + self.rd = position; } - /// Returns the remaining write capacity until which the buf becomes full. - fn write_remaining(&self) -> usize { - self.cap - self.len - } - - fn advance_reader(&mut self, mut cnt: usize) { - if self.cap == 0 { - return; + /// Return the number of buffered bytes + pub fn len(&self) -> usize { + if self.wr >= self.capacity() as u64 { + (self.rd - (self.wr - self.capacity() as u64)) as usize + } else { + self.rd as usize } - cnt = cmp::min(cnt, self.read_remaining()); + } - self.pos += cnt; - self.pos %= self.cap; - self.len -= cnt; + /// Returns `true` if the buf cannot accept any further reads. + pub fn is_empty(&self) -> bool { + self.len() == 0 } - fn advance_writer(&mut self, mut cnt: usize) { - cnt = cmp::min(cnt, self.write_remaining()); - self.len += cnt; + /// Resets all internal state to the initial state. + pub fn clear(&mut self) { + self.rd = 0; + self.wr = 0; + } - // Adjust the mark to account for bytes written. - if let Mark::At { ref mut len, .. } = self.mark { - *len += cnt; - } + /// Returns the number of bytes remaining to read. + pub fn remaining_read(&self) -> usize { + (self.wr - self.rd) as usize + } - // Clear the mark if we've written past it. - if let Mark::At { len, .. } = self.mark { - if len > self.cap { - self.mark = Mark::NoMark; - } - } + /// Returns the remaining write capacity until which the buf becomes full. + pub fn remaining_write(&self) -> usize { + self.capacity() - self.remaining_read() } } -impl fmt::Debug for RingBuf { +impl<T: AsRef<[u8]>> fmt::Debug for RingBuf<T> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "RingBuf[.. {}]", self.len) + write!(fmt, "RingBuf[.. {}]", self.len()) } } -impl Buf for RingBuf { +impl<T: AsRef<[u8]>> Buf for RingBuf<T> { fn remaining(&self) -> usize { - self.read_remaining() + self.remaining_read() } fn bytes(&self) -> &[u8] { - let mut to = self.pos + self.len; - - if to > self.cap { - to = self.cap + // This comparison must be performed in order to differentiate between + // the at capacity case and the empty case. + if self.wr > self.rd { + let a = (self.rd & self.mask) as usize; + let b = (self.wr & self.mask) as usize; + + println!("a={:?}; b={:?}, wr={:?}; rd={:?}", a, b, self.wr, self.rd); + + if b > a { + &self.mem.as_ref()[a..b] + } else { + &self.mem.as_ref()[a..] + } + } else { + &[] } - - unsafe { &self.ptr.bytes()[self.pos .. to] } } fn advance(&mut self, cnt: usize) { - self.advance_reader(cnt) + assert!(cnt <= self.remaining_read(), "buffer overflow"); + self.rd += cnt as u64 } } -impl MutBuf for RingBuf { - +impl<T> MutBuf for RingBuf<T> + where T: AsRef<[u8]> + AsMut<[u8]>, +{ fn remaining(&self) -> usize { - self.write_remaining() + self.remaining_write() } unsafe fn advance(&mut self, cnt: usize) { - self.advance_writer(cnt) + assert!(cnt <= self.remaining_write(), "buffer overflow"); + self.wr += cnt as u64; } unsafe fn mut_bytes(&mut self) -> &mut [u8] { - if self.cap == 0 { - return self.ptr.mut_bytes(); - } - let mut from; - let mut to; + let a = (self.wr & self.mask) as usize; - from = self.pos + self.len; - from %= self.cap; + if self.wr > self.rd { + let b = (self.rd & self.mask) as usize; - to = from + <Self as MutBuf>::remaining(&self); - - if to >= self.cap { - to = self.cap; + if a >= b { + &mut self.mem.as_mut()[a..] + } else { + &mut self.mem.as_mut()[a..b] + } + } else { + &mut self.mem.as_mut()[a..] } - - &mut self.ptr.mut_bytes()[from..to] } } - -unsafe impl Send for RingBuf { } diff --git a/test/test_ring.rs b/test/test_ring.rs index 135597ac90b844684e99d9ea5a4881a1019b0157..d2dd5dd1be782483d9a9aca80143c0fcaa2ba9df 100644 --- a/test/test_ring.rs +++ b/test/test_ring.rs @@ -1,6 +1,12 @@ use bytes::{Buf, MutBuf}; use bytes::buf::RingBuf; +#[test] +pub fn test_ring_buf_is_send() { + fn is_send<T: Send>() {} + is_send::<RingBuf>(); +} + #[test] pub fn test_initial_buf_empty() { let mut buf = RingBuf::with_capacity(16); @@ -18,11 +24,11 @@ pub fn test_initial_buf_empty() { let mut out = [0u8; 3]; - buf.mark(); + let pos = buf.position(); let bytes_read = buf.copy_to(&mut out[..]); assert_eq!(bytes_read, 3); assert_eq!(out, [1, 2, 3]); - buf.reset(); + buf.set_position(pos); let bytes_read = buf.copy_to(&mut out[..]); assert_eq!(bytes_read, 3); assert_eq!(out, [1, 2, 3]); @@ -43,11 +49,11 @@ fn test_wrapping_write() { let bytes_written = buf.copy_from(&[23;8][..]); assert_eq!(bytes_written, 8); - buf.mark(); + let pos = buf.position(); let bytes_read = buf.copy_to(&mut out[..]); assert_eq!(bytes_read, 10); assert_eq!(out, [42, 42, 23, 23, 23, 23, 23, 23, 23, 23]); - buf.reset(); + buf.set_position(pos); let bytes_read = buf.copy_to(&mut out[..]); assert_eq!(bytes_read, 10); assert_eq!(out, [42, 42, 23, 23, 23, 23, 23, 23, 23, 23]); @@ -77,10 +83,10 @@ fn test_io_write_and_read() { fn test_wrap_reset() { let mut buf = RingBuf::with_capacity(8); buf.copy_from(&[1, 2, 3, 4, 5, 6, 7][..]); - buf.mark(); + let pos = buf.position(); buf.copy_to(&mut [0; 4][..]); buf.copy_from(&[1, 2, 3, 4][..]); - buf.reset(); + buf.set_position(pos); } #[test] @@ -88,9 +94,9 @@ fn test_wrap_reset() { fn test_mark_write() { let mut buf = RingBuf::with_capacity(8); buf.copy_from(&[1, 2, 3, 4, 5, 6, 7][..]); - buf.mark(); + let pos = buf.position(); buf.copy_from(&[8][..]); - buf.reset(); + buf.set_position(pos); let mut buf2 = [0; 8]; buf.copy_to(&mut buf2[..]); @@ -104,8 +110,8 @@ fn test_reset_full() { let mut buf = RingBuf::with_capacity(8); buf.copy_from(&[1, 2, 3, 4, 5, 6, 7, 8][..]); assert_eq!(MutBuf::remaining(&buf), 0); - buf.mark(); - buf.reset(); + let pos = buf.position(); + buf.set_position(pos); assert_eq!(MutBuf::remaining(&buf), 0); }