Skip to content
Snippets Groups Projects
Commit c5dbdcc8 authored by Guy Zana's avatar Guy Zana
Browse files

bsd: rewrite callout mechanism to avoid a race

the old implementation used threads for dispatching callouts, each callout
owned a thread and it suffered from a race where a callout structure could have
been deleted before the callout thread even begun.

the current implementation is dispatching all callouts in a single callout
dispatcher thread, it maintains an ordered list of callouts to achieve that.

this patch solve a crash with the TCPDownloadFile test, that now proceeds.
parent 4a63055e
No related branches found
No related tags found
No related merge requests found
......@@ -3,6 +3,7 @@
extern "C" {
#include <sys/time.h>
#include <bsd/porting/callout.h>
#include <bsd/porting/netport.h>
#include <bsd/porting/networking.h>
#include <bsd/porting/route.h>
......@@ -41,6 +42,9 @@ void net_init(void)
{
debug("Initializing network stack...\n");
// Initialize callouts
init_callouts();
/* Random */
struct timeval tv;
bsd_srandom(tv.tv_sec ^ tv.tv_usec);
......
......@@ -38,27 +38,17 @@
#ifndef _SYS__CALLOUT_H
#define _SYS__CALLOUT_H
#define CALLOUT_LOCK(c) (mutex_lock(&(c)->c_callout_mtx))
#define CALLOUT_UNLOCK(c) (mutex_unlock(&(c)->c_callout_mtx))
#include <osv/mutex.h>
struct callout {
/* OSv thread */
void *thread;
/* OSv waiter thread for drain (drain) */
void *waiter_thread;
/* State of this entry */
int c_flags;
/* OSv: Mark to stop waiting */
volatile int c_stopped;
/* OSv: Mark to reschedule currently running callout */
volatile int c_reschedule;
uint64_t c_ticks;
/* Time when callout will be dispatched, both in ticks and in ns */
uint64_t c_time;
uint64_t c_to_ns;
/* MP lock to callout data */
mutex_t c_callout_mtx;
/* Callout Handler */
void (*c_fn)(void *);
void* c_arg;
......
#include <mutex>
#include <set>
#include "drivers/clock.hh"
#include "debug.hh"
#include "sched.hh"
......@@ -9,26 +11,86 @@ extern "C" {
#include <bsd/porting/sync_stub.h>
}
#define cdbg(...) logger::instance()->wrt("callout", logger_debug, __VA_ARGS__)
#define cdbg(...) tprintf("callout", logger_debug, __VA_ARGS__)
int _callout_stop_safe_locked(struct callout *c, int safe);
sched::thread* callout_get_thread(struct callout *c)
{
return reinterpret_cast<sched::thread*>(c->thread);
}
namespace callouts {
void callout_set_thread(struct callout *c, sched::thread* t)
{
c->thread = reinterpret_cast<void*>(t);
struct callout_compare {
bool operator() (callout *a, callout *b) const {
if (a->c_to_ns == b->c_to_ns) {
return (a < b);
}
return (a->c_to_ns < b->c_to_ns);
}
};
// We manage callouts in a single ordered set
std::set<callout *, callout_compare> _callouts;
// Both global data and callout structure are protected using this lock
mutex _callout_mutex;
void lock(void) { _callout_mutex.lock(); }
void unlock(void) { _callout_mutex.unlock(); }
// The callout dispatcher thread
sched::thread *_callout_dispatcher = nullptr;
bool _have_work = false;
void print_callouts(void)
{
cdbg("Showing Callouts:");
for (auto i: _callouts) {
cdbg(" 0x%lx expires=%lu", (uint64_t)i, i->c_to_ns);
}
}
void add_callout(callout *c)
{
_callouts.insert(c);
}
void remove_callout(callout *c)
{
_callouts.erase(c);
}
callout *get_one(void)
{
if (_callouts.empty()) {
return (nullptr);
}
return (*_callouts.begin());
}
// FIXME: optimize this function
bool have_callout(callout *c)
{
for (auto i: _callouts) {
if (i == c) {
return (true);
}
}
return (false);
}
// wakes the dispatcher
void wake_dispatcher(void)
{
_have_work = true;
_callout_dispatcher->wake();
}
}
sched::thread* callout_get_waiter(struct callout *c)
static sched::thread* callout_get_waiter(struct callout *c)
{
return reinterpret_cast<sched::thread*>(c->waiter_thread);
}
void callout_set_waiter(struct callout *c, sched::thread* t)
static void callout_set_waiter(struct callout *c, sched::thread* t)
{
if (t != NULL)
assert(c->waiter_thread == NULL);
......@@ -36,69 +98,100 @@ void callout_set_waiter(struct callout *c, sched::thread* t)
c->waiter_thread = reinterpret_cast<void*>(t);
}
void _callout_wrapper(struct callout *c)
static void _callout_thread(void)
{
sched::thread* current = sched::thread::current();
callouts::lock();
CALLOUT_LOCK(c);
while (true) {
cdbg("*C* C=0x%lx _callout_wrapper()",
(uint64_t)c, c->c_stopped);
if (c->c_stopped) {
cdbg("*C* C=0x%lx _callout_wrapper() stopped 1", (uint64_t)c);
goto completed;
}
// Wait for work
sched::thread::wait_until(callouts::_callout_mutex, [] {
return (callouts::get_one() != nullptr);
});
assert(c->c_flags & (CALLOUT_ACTIVE | CALLOUT_PENDING));
// get the first callout with the earliest time
callout *c = callouts::get_one();
do {
assert(c->c_flags & (CALLOUT_ACTIVE | CALLOUT_PENDING));
//////////////////////
// Wait for timeout //
//////////////////////
c->c_reschedule = 0;
c->c_flags |= CALLOUT_PENDING;
cdbg("*C* C=0x%lx _callout_wrapper() pending", (uint64_t)c);
uint64_t cur = clock::get()->time();
if (cur < c->c_to_ns+TMILISECOND) {
bool expired = true;
if (cur < c->c_to_ns-TMILISECOND) {
sched::timer t(*sched::thread::current());
t.set(c->c_to_ns);
sched::thread::wait_until(c->c_callout_mtx, [&] {
return ( (t.expired()) || (c->c_stopped) );
cdbg("*C* C=0x%lx _callout_thread() waiting...", (uint64_t)c);
sched::thread::wait_until(callouts::_callout_mutex, [&] {
return ( (t.expired()) || (callouts::_have_work));
});
}
if (c->c_stopped == 1) {
cdbg("*C* C=0x%lx _callout_wrapper() stopped 2", (uint64_t)c);
goto completed;
callouts::_have_work = false;
expired = t.expired();
}
struct mtx* c_mtx = c->c_mtx;
struct rwlock* c_rwlock = c->c_rwlock;
bool return_unlocked = ((c->c_flags & CALLOUT_RETURNUNLOCKED) == 0);
if (!expired) {
cdbg("*C* C=0x%lx _callout_thread() retry", (uint64_t)c);
continue;
}
///////////////
// Dispatch! //
///////////////
auto fn = c->c_fn;
auto arg = c->c_arg;
struct mtx* c_mtx = c->c_mtx;
struct rwlock* c_rwlock = c->c_rwlock;
bool return_unlocked = ((c->c_flags & CALLOUT_RETURNUNLOCKED) == 0);
if (c_rwlock)
rw_wlock(c_rwlock);
if (c_mtx)
mtx_lock(c_mtx);
c->c_flags &= ~CALLOUT_PENDING;
c->c_flags |= CALLOUT_DISPATCHING;
CALLOUT_UNLOCK(c);
// Callout handler
c->c_fn(c->c_arg);
cdbg("*C* C=0x%lx _callout_thread() dispatching", (uint64_t)c);
callouts::unlock();
CALLOUT_LOCK(c);
// Callout handler
fn(arg);
callouts::lock();
sched::thread* waiter = nullptr;
//
// note: after the handler have been invoked the callout structure
// can look much differently, the handler may reschedule the callout
// or even freed it.
//
// if the callout is in the set it means that it hasn't been freed
// by the user
//
// reset || drain || !stop
if (callouts::have_callout(c)) {
cdbg("*C* C=0x%lx _callout_thread() done dispatching", (uint64_t)c);
waiter = callout_get_waiter(c);
callout_set_waiter(c, NULL);
// if the callout hadn't been reschedule, remove it
if ( ((c->c_flags & CALLOUT_PENDING) == 0) || (waiter) ) {
cdbg("*C* C=0x%lx _callout_thread() removing callout", (uint64_t)c);
c->c_flags |= CALLOUT_COMPLETED;
callouts::remove_callout(c);
}
} else {
cdbg("*C* C=NULL _callout_thread() done dispatching");
}
// FIXME: should we do this in case the caller called callout_stop?
if (return_unlocked) {
if (c_rwlock)
rw_wunlock(c_rwlock);
......@@ -106,172 +199,99 @@ void _callout_wrapper(struct callout *c)
mtx_unlock(c_mtx);
}
// Check if we are still the owner of this callout
// (we don't if callout_stop() was called and we were dispatching)
sched::thread* detached = callout_get_thread(c);
if (detached != current) {
c->c_stopped = 0;
cdbg("*C* C=0x%lx _callout_wrapper() lost ownership to 0x%lx",
(uint64_t)c, (uint64_t)detached);
CALLOUT_UNLOCK(c);
return;
}
c->c_flags &= ~CALLOUT_DISPATCHING;
} while (c->c_reschedule && !c->c_stopped);
completed:
cdbg("*C* C=0x%lx _callout_wrapper() completed",
(uint64_t)c, c->c_stopped);
cdbg("*C* C=0x%lx _callout_thread() completed", (uint64_t)c);
c->c_stopped = 0;
c->c_reschedule = 0;
c->c_flags &= ~(CALLOUT_PENDING | CALLOUT_DISPATCHING);
c->c_flags |= CALLOUT_COMPLETED;
sched::thread* waiter = callout_get_waiter(c);
callout_set_waiter(c, NULL);
if (waiter) {
cdbg("*C* C=0x%lx _callout_wrapper() waking 0x%lx",
(uint64_t)c, waiter);
waiter->wake();
// if we have a waiter then the callout structure must be valid
if (waiter) {
cdbg("*C* C=0x%lx _callout_thread() waking 0x%lx",
(uint64_t)c, waiter);
waiter->wake();
}
}
CALLOUT_UNLOCK(c);
}
int callout_reset_on(struct callout *c, u64 to_ticks, void (*fn)(void *),
void *arg, int ignore_cpu)
{
sched::thread *orig_thread = callout_get_thread(c);
sched::thread *current_thread = sched::thread::current();
u64 cur = clock::get()->time();
int cur_ticks = ns2ticks(cur);
int result = 0;
CALLOUT_LOCK(c);
callouts::lock();
cdbg("*C* C=0x%lx callout_reset_on() to_ticks=%lu fn=0x%lx arg=0x%lx",
(uint64_t)c, to_ticks, fn, arg);
if ((c->c_flags & CALLOUT_DISPATCHING) &&
(orig_thread == current_thread) &&
(c->c_stopped == 1)) {
cdbg("*C* C=0x%lx callout_reset_on() callout stopped", (uint64_t)c);
CALLOUT_UNLOCK(c);
return result;
}
// If callout is scheduled
if ( (c->c_flags & CALLOUT_PENDING) ||
((c->c_flags & CALLOUT_DISPATCHING) &&
(orig_thread != current_thread)) ) {
_callout_stop_safe_locked(c, 1);
result = 1;
}
void (*old_fn)(void *) = c->c_fn;
void* old_arg = c->c_arg;
result = _callout_stop_safe_locked(c, 0);
// Reset the callout
c->c_stopped = 0;
c->c_reschedule = 0;
c->c_time = cur_ticks + to_ticks;
c->c_to_ns = cur + ticks2ns(to_ticks);
c->c_ticks = to_ticks;
c->c_time = cur_ticks + to_ticks; // for freebsd compatibility
c->c_to_ns = cur + ticks2ns(to_ticks); // this is what we use
c->c_fn = fn;
c->c_arg = arg;
// Check for recursive callouts
if ((c->c_flags & CALLOUT_DISPATCHING) &&
(orig_thread == current_thread)) {
assert(old_fn == fn);
assert(old_arg == arg);
c->c_reschedule = 1;
cdbg("*C* C=0x%lx callout_reset_on() rescheduling...", (uint64_t)c);
CALLOUT_UNLOCK(c);
return 1;
}
sched::thread::attr a;
a.detached = true;
sched::thread* callout_thread = new sched::thread([=] {
_callout_wrapper(c);
}, a);
callout_set_thread(c, callout_thread);
c->c_flags &= ~(CALLOUT_COMPLETED | CALLOUT_DISPATCHING);
c->c_flags |= (CALLOUT_PENDING | CALLOUT_ACTIVE);
CALLOUT_UNLOCK(c);
callout_thread->start();
callouts::add_callout(c);
callouts::wake_dispatcher();
callouts::unlock();
return result;
}
/*
* callout_stop() and callout_drain()
*/
int _callout_stop_safe_locked(struct callout *c, int safe)
// callout_stop() and callout_drain()
int _callout_stop_safe_locked(struct callout *c, int is_drain)
{
sched::thread* callout_thread = callout_get_thread(c);
cdbg("*C* C=0x%lx _callout_stop_safe_locked() c->thread=0x%lx c->flags=0x%04x",
(uint64_t)c, (uint64_t)callout_thread, c->c_flags);
if (!callout_thread) {
c->c_flags &= ~(CALLOUT_ACTIVE | CALLOUT_PENDING);
return 0;
}
int result = 0;
cdbg("*C* C=0x%lx _callout_stop_safe_locked() c->flags=0x%04x is_drain=%d",
(uint64_t)c, c->c_flags, is_drain);
c->c_stopped = 1;
c->c_reschedule = 0;
if ((is_drain) &&
(sched::thread::current() != callouts::_callout_dispatcher) &&
(callout_pending(c) ||
(callout_active(c) && !callout_completed(c))) ) {
if (c->c_flags & CALLOUT_PENDING) {
cdbg("*C* C=0x%lx _callout_stop_safe_locked() waiting...", (uint64_t)c);
assert(callout_thread != sched::thread::current());
// Wait for callout
callout_set_waiter(c, sched::thread::current());
callout_thread->wake();
sched::thread::wait_until(c->c_callout_mtx, [&] {
callouts::wake_dispatcher();
sched::thread::wait_until(callouts::_callout_mutex, [&] {
return (c->c_flags & CALLOUT_COMPLETED);
});
cdbg("*C* C=0x%lx _callout_stop_safe_locked() stopped waiting",
result = 1;
cdbg("*C* C=0x%lx _callout_stop_safe_locked() done waiting",
(uint64_t)c);
}
callouts::remove_callout(c);
// Clear flags
c->c_flags &= ~(CALLOUT_ACTIVE | CALLOUT_PENDING | CALLOUT_COMPLETED);
callout_set_waiter(c, NULL);
callout_set_thread(c, NULL);
return 0;
return (result);
}
int _callout_stop_safe(struct callout *c, int safe)
int _callout_stop_safe(struct callout *c, int is_drain)
{
int result = 0;
CALLOUT_LOCK(c);
result = _callout_stop_safe_locked(c, safe);
CALLOUT_UNLOCK(c);
callouts::lock();
result = _callout_stop_safe_locked(c, is_drain);
callouts::unlock();
return (result);
}
/*
* If mpsafe is zero, the callout should wrap the call to the handler
* In freebsd, this is done using the Giant lock.
*/
void callout_init(struct callout *c, int mpsafe)
{
bzero(c, sizeof *c);
assert(mpsafe != 0);
cdbg("*C* C=0x%lx callout_init()", (uint64_t)c);
mutex_init(&c->c_callout_mtx);
}
void callout_init_rw(struct callout *c, struct rwlock *rw, int flags)
......@@ -291,3 +311,10 @@ void callout_init_mtx(struct callout *c, struct mtx *mtx, int flags)
c->c_mtx = mtx;
c->c_flags = flags;
}
void init_callouts(void)
{
// Start the callout thread
callouts::_callout_dispatcher = new sched::thread(_callout_thread);
callouts::_callout_dispatcher->start();
}
......@@ -42,7 +42,7 @@
#include <bsd/porting/netport.h>
#include <bsd/porting/_callout.h>
#define CALLOUT_SIGNATURE (36456283)
void init_callouts(void);
#define CALLOUT_LOCAL_ALLOC 0x0001 /* was allocated from callfree */
#define CALLOUT_ACTIVE 0x0002 /* callout is currently active */
......@@ -50,12 +50,9 @@
#define CALLOUT_MPSAFE 0x0008 /* callout handler is mp safe */
#define CALLOUT_RETURNUNLOCKED 0x0010 /* handler returns with mtx unlocked */
#define CALLOUT_COMPLETED 0x0020 /* callout thread finished */
#define CALLOUT_DISPATCHING 0x0040 /* callout handler currently is running */
struct lock_object;
extern int ncallout;
#define callout_active(c) ((c)->c_flags & CALLOUT_ACTIVE)
#define callout_deactivate(c) ((c)->c_flags &= ~CALLOUT_ACTIVE)
#define callout_drain(c) _callout_stop_safe(c, 1)
......@@ -63,6 +60,7 @@ void callout_init(struct callout *, int);
void callout_init_mtx(struct callout *c, struct mtx *lock, int flags);
void callout_init_rw(struct callout *c, struct rwlock *rw, int flags);
#define callout_pending(c) ((c)->c_flags & CALLOUT_PENDING)
#define callout_completed(c) ((c)->c_flags & CALLOUT_COMPLETED)
int callout_reset_on(struct callout *, u64, void (*)(void *), void *, int);
#define callout_reset(c, on_tick, fn, arg) \
callout_reset_on((c), (on_tick), (fn), (arg), 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment