Skip to content
Snippets Groups Projects
Commit b10992a5 authored by Carl Lerche's avatar Carl Lerche
Browse files

Refactor RingBuf

parent b1dc10e9
No related branches found
No related tags found
No related merge requests found
use {alloc, Buf, MutBuf};
use std::{cmp, fmt};
use {Buf, MutBuf};
use imp::alloc;
use std::fmt;
enum Mark {
NoMark,
At { pos: usize, len: usize },
}
/// Buf backed by a continous chunk of memory. Maintains a read cursor and a
/// write cursor. When reads and writes reach the end of the allocated buffer,
/// wraps around to the start.
/// `RingBuf` is backed by contiguous memory and writes may wrap.
///
/// This type is suited for use cases where reads and writes are intermixed.
pub struct RingBuf {
ptr: alloc::MemRef, // Pointer to the memory
cap: usize, // Capacity of the buffer
pos: usize, // Offset of read cursor
len: usize, // Number of bytes to read
mark: Mark, // Marked read position
/// When writing reaches the end of the memory, writing resume at the beginning
/// of the memory. Writes may never overwrite pending reads.
pub struct RingBuf<T = Box<[u8]>> {
// Contiguous memory
mem: T,
// Current read position
rd: u64,
// Current write position
wr: u64,
// Mask used to convert the cursor to an offset
mask: u64,
}
// TODO: There are most likely many optimizations that can be made
impl RingBuf {
/// Allocates a new `RingBuf` with the specified capacity.
pub fn with_capacity(mut capacity: usize) -> RingBuf {
// Round to the next power of 2 for better alignment
capacity = capacity.next_power_of_two();
unsafe {
let mem = alloc::heap(capacity as usize);
RingBuf {
ptr: mem,
cap: capacity,
pos: 0,
len: 0,
mark: Mark::NoMark,
}
}
}
/// Returns `true` if the buf cannot accept any further writes.
pub fn is_full(&self) -> bool {
self.cap == self.len
pub fn with_capacity(capacity: usize) -> RingBuf {
let mem = unsafe { alloc::with_capacity(capacity) };
RingBuf::new(mem)
}
}
/// Returns `true` if the buf cannot accept any further reads.
pub fn is_empty(&self) -> bool {
self.len == 0
impl<T: AsRef<[u8]>> RingBuf<T> {
/// Creates a new `RingBuf` wrapping the provided slice
pub fn new(mem: T) -> RingBuf<T> {
// Ensure that the memory chunk provided has a length that is a power
// of 2
let len = mem.as_ref().len() as u64;
let mask = len - 1;
assert!(len & mask == 0, "mem length must be power of two");
RingBuf {
mem: mem,
rd: 0,
wr: 0,
mask: mask,
}
}
/// Returns the number of bytes that the buf can hold.
pub fn capacity(&self) -> usize {
self.cap
}
/// Marks the current read location.
///
/// Together with `reset`, this can be used to read from a section of the
/// buffer multiple times. The mark will be cleared if it is overwritten
/// during a write.
pub fn mark(&mut self) {
self.mark = Mark::At { pos: self.pos, len: self.len };
}
/// Resets the read position to the previously marked position.
///
/// Together with `mark`, this can be used to read from a section of the
/// buffer multiple times.
///
/// # Panics
///
/// This method will panic if no mark has been set,
pub fn reset(&mut self){
match self.mark {
Mark::NoMark => panic!("no mark set"),
Mark::At {pos, len} => {
self.pos = pos;
self.len = len;
self.mark = Mark::NoMark;
}
}
self.mem.as_ref().len()
}
/// Resets all internal state to the initial state.
pub fn clear(&mut self) {
self.pos = 0;
self.len = 0;
self.mark = Mark::NoMark;
/// Return the read cursor position
pub fn position(&self) -> u64 {
self.rd
}
/// Returns the number of bytes remaining to read.
fn read_remaining(&self) -> usize {
self.len
/// Set the read cursor position
pub fn set_position(&mut self, position: u64) {
assert!(position <= self.wr && position + self.capacity() as u64 >= self.wr,
"position out of bounds");
self.rd = position;
}
/// Returns the remaining write capacity until which the buf becomes full.
fn write_remaining(&self) -> usize {
self.cap - self.len
}
fn advance_reader(&mut self, mut cnt: usize) {
if self.cap == 0 {
return;
/// Return the number of buffered bytes
pub fn len(&self) -> usize {
if self.wr >= self.capacity() as u64 {
(self.rd - (self.wr - self.capacity() as u64)) as usize
} else {
self.rd as usize
}
cnt = cmp::min(cnt, self.read_remaining());
}
self.pos += cnt;
self.pos %= self.cap;
self.len -= cnt;
/// Returns `true` if the buf cannot accept any further reads.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
fn advance_writer(&mut self, mut cnt: usize) {
cnt = cmp::min(cnt, self.write_remaining());
self.len += cnt;
/// Resets all internal state to the initial state.
pub fn clear(&mut self) {
self.rd = 0;
self.wr = 0;
}
// Adjust the mark to account for bytes written.
if let Mark::At { ref mut len, .. } = self.mark {
*len += cnt;
}
/// Returns the number of bytes remaining to read.
pub fn remaining_read(&self) -> usize {
(self.wr - self.rd) as usize
}
// Clear the mark if we've written past it.
if let Mark::At { len, .. } = self.mark {
if len > self.cap {
self.mark = Mark::NoMark;
}
}
/// Returns the remaining write capacity until which the buf becomes full.
pub fn remaining_write(&self) -> usize {
self.capacity() - self.remaining_read()
}
}
impl fmt::Debug for RingBuf {
impl<T: AsRef<[u8]>> fmt::Debug for RingBuf<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "RingBuf[.. {}]", self.len)
write!(fmt, "RingBuf[.. {}]", self.len())
}
}
impl Buf for RingBuf {
impl<T: AsRef<[u8]>> Buf for RingBuf<T> {
fn remaining(&self) -> usize {
self.read_remaining()
self.remaining_read()
}
fn bytes(&self) -> &[u8] {
let mut to = self.pos + self.len;
if to > self.cap {
to = self.cap
// This comparison must be performed in order to differentiate between
// the at capacity case and the empty case.
if self.wr > self.rd {
let a = (self.rd & self.mask) as usize;
let b = (self.wr & self.mask) as usize;
println!("a={:?}; b={:?}, wr={:?}; rd={:?}", a, b, self.wr, self.rd);
if b > a {
&self.mem.as_ref()[a..b]
} else {
&self.mem.as_ref()[a..]
}
} else {
&[]
}
unsafe { &self.ptr.bytes()[self.pos .. to] }
}
fn advance(&mut self, cnt: usize) {
self.advance_reader(cnt)
assert!(cnt <= self.remaining_read(), "buffer overflow");
self.rd += cnt as u64
}
}
impl MutBuf for RingBuf {
impl<T> MutBuf for RingBuf<T>
where T: AsRef<[u8]> + AsMut<[u8]>,
{
fn remaining(&self) -> usize {
self.write_remaining()
self.remaining_write()
}
unsafe fn advance(&mut self, cnt: usize) {
self.advance_writer(cnt)
assert!(cnt <= self.remaining_write(), "buffer overflow");
self.wr += cnt as u64;
}
unsafe fn mut_bytes(&mut self) -> &mut [u8] {
if self.cap == 0 {
return self.ptr.mut_bytes();
}
let mut from;
let mut to;
let a = (self.wr & self.mask) as usize;
from = self.pos + self.len;
from %= self.cap;
if self.wr > self.rd {
let b = (self.rd & self.mask) as usize;
to = from + <Self as MutBuf>::remaining(&self);
if to >= self.cap {
to = self.cap;
if a >= b {
&mut self.mem.as_mut()[a..]
} else {
&mut self.mem.as_mut()[a..b]
}
} else {
&mut self.mem.as_mut()[a..]
}
&mut self.ptr.mut_bytes()[from..to]
}
}
unsafe impl Send for RingBuf { }
use bytes::{Buf, MutBuf};
use bytes::buf::RingBuf;
#[test]
pub fn test_ring_buf_is_send() {
fn is_send<T: Send>() {}
is_send::<RingBuf>();
}
#[test]
pub fn test_initial_buf_empty() {
let mut buf = RingBuf::with_capacity(16);
......@@ -18,11 +24,11 @@ pub fn test_initial_buf_empty() {
let mut out = [0u8; 3];
buf.mark();
let pos = buf.position();
let bytes_read = buf.copy_to(&mut out[..]);
assert_eq!(bytes_read, 3);
assert_eq!(out, [1, 2, 3]);
buf.reset();
buf.set_position(pos);
let bytes_read = buf.copy_to(&mut out[..]);
assert_eq!(bytes_read, 3);
assert_eq!(out, [1, 2, 3]);
......@@ -43,11 +49,11 @@ fn test_wrapping_write() {
let bytes_written = buf.copy_from(&[23;8][..]);
assert_eq!(bytes_written, 8);
buf.mark();
let pos = buf.position();
let bytes_read = buf.copy_to(&mut out[..]);
assert_eq!(bytes_read, 10);
assert_eq!(out, [42, 42, 23, 23, 23, 23, 23, 23, 23, 23]);
buf.reset();
buf.set_position(pos);
let bytes_read = buf.copy_to(&mut out[..]);
assert_eq!(bytes_read, 10);
assert_eq!(out, [42, 42, 23, 23, 23, 23, 23, 23, 23, 23]);
......@@ -77,10 +83,10 @@ fn test_io_write_and_read() {
fn test_wrap_reset() {
let mut buf = RingBuf::with_capacity(8);
buf.copy_from(&[1, 2, 3, 4, 5, 6, 7][..]);
buf.mark();
let pos = buf.position();
buf.copy_to(&mut [0; 4][..]);
buf.copy_from(&[1, 2, 3, 4][..]);
buf.reset();
buf.set_position(pos);
}
#[test]
......@@ -88,9 +94,9 @@ fn test_wrap_reset() {
fn test_mark_write() {
let mut buf = RingBuf::with_capacity(8);
buf.copy_from(&[1, 2, 3, 4, 5, 6, 7][..]);
buf.mark();
let pos = buf.position();
buf.copy_from(&[8][..]);
buf.reset();
buf.set_position(pos);
let mut buf2 = [0; 8];
buf.copy_to(&mut buf2[..]);
......@@ -104,8 +110,8 @@ fn test_reset_full() {
let mut buf = RingBuf::with_capacity(8);
buf.copy_from(&[1, 2, 3, 4, 5, 6, 7, 8][..]);
assert_eq!(MutBuf::remaining(&buf), 0);
buf.mark();
buf.reset();
let pos = buf.position();
buf.set_position(pos);
assert_eq!(MutBuf::remaining(&buf), 0);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment