Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
use super::{Mem, MemRef};
use buf::{ByteBuf, MutByteBuf};
use stable_heap as heap;
use std::{mem, ptr, isize, usize};
use std::cell::{Cell, UnsafeCell};
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
// TODO: ensure that not Sync
pub struct Pool {
inner: Arc<PoolInner>,
marker: PhantomData<Cell<()>>,
}
struct PoolInner {
ptr: *mut u8, // Pointer to the raw memory
next: AtomicPtr<Entry>,
cap: usize, // Total number of entries
buf_len: usize, // Byte size of each byte buf
entry_len: usize, // Byte size of each entry
}
struct Entry {
inner: UnsafeCell<Inner>,
}
struct Inner {
pool: Option<Pool>,
refs: AtomicUsize,
next: *mut Entry,
}
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
impl Pool {
/// Constructs a new `Pool` with with specified capacity such that each
/// buffer in the pool has a length of `buf_len`.
pub fn with_capacity(cap: usize, mut buf_len: usize) -> Pool {
// Ensure that all buffers have a power of 2 size. This enables
// optimizations in Buf implementations.
buf_len = buf_len.next_power_of_two();
let inner = Arc::new(PoolInner::with_capacity(cap, buf_len));
// Iterate each entry and initialize the memory
let mut next = ptr::null_mut();
for i in 0..cap {
unsafe {
let off = i * inner.entry_len;
let ptr = inner.ptr.offset(off as isize);
let e = &mut *(ptr as *mut Entry);
ptr::write(&mut e.inner as *mut UnsafeCell<Inner>, UnsafeCell::new(Inner {
pool: None,
refs: AtomicUsize::new(0),
next: next,
}));
next = ptr as *mut Entry;
let ptr = ptr.offset(mem::size_of::<Entry>() as isize);
ptr::write(ptr as *mut &Mem, e as &Mem);
let ptr = ptr.offset(mem::size_of::<&Mem>() as isize);
ptr::write(ptr as *mut usize, buf_len);
}
}
// Set the next ptr to the head of the Entry linked list
inner.next.store(next, Ordering::Relaxed);
Pool {
inner: inner,
marker: PhantomData,
}
}
/// Returns the number of buffers that the `Pool` holds.
pub fn capacity(&self) -> usize {
self.inner.cap
}
/// Returns a new `ByteBuf` backed by a buffer from the pool. If the pool
/// is depleted, `None` is returned.
pub fn new_byte_buf(&self) -> Option<MutByteBuf> {
let len = self.inner.buf_len as u32;
self.checkout().map(|mem| {
let buf = unsafe { ByteBuf::from_mem_ref(mem, len, 0, len) };
buf.flip()
})
}
fn checkout(&self) -> Option<MemRef> {
unsafe {
let mut ptr = self.inner.next.load(Ordering::Acquire);
loop {
if ptr.is_null() {
// The pool is depleted
return None;
}
let inner = &*(*ptr).inner.get();
let next = inner.next;
let res = self.inner.next.compare_and_swap(ptr, next, Ordering::AcqRel);
if res == ptr {
break;
}
ptr = res;
}
let inner = &mut *(*ptr).inner.get();
// Unset next pointer & set the pool
inner.next = ptr::null_mut();
inner.refs.store(1, Ordering::Relaxed);
inner.pool = Some(self.clone());
let ptr = ptr as *mut u8;
let ptr = ptr.offset(mem::size_of::<Entry>() as isize);
Some(MemRef::new(ptr))
}
}
fn clone(&self) -> Pool {
Pool {
inner: self.inner.clone(),
marker: PhantomData,
}
}
}
impl PoolInner {
fn with_capacity(cap: usize, buf_len: usize) -> PoolInner {
let ptr = unsafe { heap::allocate(alloc_len(cap, buf_len), align()) };
PoolInner {
ptr: ptr,
next: AtomicPtr::new(ptr::null_mut()),
cap: cap,
buf_len: buf_len,
entry_len: entry_len(buf_len),
}
}
}
impl Drop for PoolInner {
fn drop(&mut self) {
unsafe { heap::deallocate(self.ptr, alloc_len(self.cap, self.buf_len), align()) }
}
}
impl Entry {
fn release(&self) {
unsafe {
let inner = &mut *self.inner.get();
let pool = inner.pool.take()
.expect("entry not associated with a pool");
let mut next = pool.inner.next.load(Ordering::Acquire);
loop {
inner.next = next;
let actual = pool.inner.next
.compare_and_swap(next, self as *const Entry as *mut Entry, Ordering::AcqRel);
if actual == next {
break;
}
next = actual;
}
}
}
}
impl Mem for Entry {
fn ref_inc(&self) {
// Using a relaxed ordering is alright here, as knowledge of the
// original reference prevents other threads from erroneously deleting
// the object.
//
// As explained in the [Boost documentation][1], Increasing the
// reference counter can always be done with memory_order_relaxed: New
// references to an object can only be formed from an existing
// reference, and passing an existing reference from one thread to
// another must already provide any required synchronization.
//
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
let old_size = unsafe {
(*self.inner.get()).refs.fetch_add(1, Ordering::Relaxed)
};
// However we need to guard against massive refcounts in case someone
// is `mem::forget`ing Arcs. If we don't do this the count can overflow
// and users will use-after free. We racily saturate to `isize::MAX` on
// the assumption that there aren't ~2 billion threads incrementing
// the reference count at once. This branch will never be taken in
// any realistic program.
//
// We abort because such a program is incredibly degenerate, and we
// don't care to support it.
if old_size > MAX_REFCOUNT {
panic!("too many refs");
}
}
fn ref_dec(&self) {
unsafe {
let prev = (*self.inner.get()).refs.fetch_sub(1, Ordering::Release);
if prev != 1 {
return;
}
}
atomic::fence(Ordering::Acquire);
self.release();
}
}
// TODO: is there a better way to do this?
unsafe impl Send for Entry {}
unsafe impl Sync for Entry {}
fn alloc_len(cap: usize, buf_len: usize) -> usize {
cap * entry_len(buf_len)
}
fn entry_len(bytes_len: usize) -> usize {
let len = bytes_len +
mem::size_of::<Entry>() +
mem::size_of::<&Mem>() +
mem::size_of::<usize>();
if len & (align() - 1) == 0 {
len
} else {
(len & !align()) + align()
}
}
fn align() -> usize {
mem::size_of::<usize>()
}