diff --git a/bsd/sys/kern/uipc_sockbuf.cc b/bsd/sys/kern/uipc_sockbuf.cc index f812e210aa4d5d3399176a106bacc055b07350a7..8b76262d33d3531120fb985d865db89795352157 100644 --- a/bsd/sys/kern/uipc_sockbuf.cc +++ b/bsd/sys/kern/uipc_sockbuf.cc @@ -32,6 +32,8 @@ #include <sys/cdefs.h> #include <osv/poll.h> +#include <drivers/clock.hh> +#include <osv/signal.hh> #include <bsd/porting/netport.h> #include <bsd/porting/rwlock.h> @@ -122,8 +124,19 @@ sbwait(struct sockbuf *sb) SOCKBUF_LOCK_ASSERT(sb); sb->sb_flags |= SB_WAIT; - return (msleep(&sb->sb_cc, &sb->sb_mtx, 0, "sbwait", - sb->sb_timeo)); + sched::timer tmr(*sched::thread::current()); + if (sb->sb_timeo) { + tmr.set(clock::get()->time() + ticks2ns(sb->sb_timeo)); + } + signal_catcher sc; + sched::thread::wait_for(sb->sb_mtx._mutex, sb->sb_cc_wq, tmr, sc); + if (sc.interrupted()) { + return EINTR; + } + if (tmr.expired()) { + return EWOULDBLOCK; + } + return 0; } int @@ -193,7 +206,7 @@ sowakeup(struct socket *so, struct sockbuf *sb) if (sb->sb_flags & SB_WAIT) { sb->sb_flags &= ~SB_WAIT; - wakeup(&sb->sb_cc); + sb->sb_cc_wq.wake_all(sb->sb_mtx._mutex); } if (sb->sb_upcall != NULL) { ret = sb->sb_upcall(so, sb->sb_upcallarg, M_DONTWAIT); diff --git a/bsd/sys/sys/sockbuf.h b/bsd/sys/sys/sockbuf.h index 9fb096915a6da25473270cca61ea8b96ffa3b666..37b458b33bc93c08cb181c7e3cac8fba0b9034d7 100644 --- a/bsd/sys/sys/sockbuf.h +++ b/bsd/sys/sys/sockbuf.h @@ -38,6 +38,7 @@ #include <bsd/porting/netport.h> #include <bsd/porting/sync_stub.h> #include <bsd/porting/rwlock.h> +#include <osv/waitqueue.hh> #define SB_MAX (2*1024*1024) /* default for max chars in sockbuf */ @@ -80,7 +81,6 @@ struct xsockbuf { * Variables for socket buffering. */ struct sockbuf { - int test_fn(); struct mtx sb_mtx; /* sockbuf lock */ struct rwlock sb_rwlock; /* prevent I/O interlacing */ short sb_state; /* (c/d) socket state on sockbuf */ @@ -92,6 +92,7 @@ struct sockbuf { struct mbuf *sb_sndptr; /* (c/d) pointer into mbuf chain */ u_int sb_sndptroff; /* (c/d) byte offset of ptr into chain */ u_int sb_cc; /* (c/d) actual chars in buffer */ + waitqueue sb_cc_wq; /* waiting on sb_cc change */ u_int sb_hiwat; /* (c/d) max actual char count */ u_int sb_mbcnt; /* (c/d) chars of mbufs used */ u_int sb_mcnt; /* (c/d) number of mbufs in buffer */ diff --git a/build.mk b/build.mk index a39e25baef189188aea00d0ffe1d39cc78abdaa0..db4a79fcf95d290ed4b1a4e20122b504aee8ab75 100644 --- a/build.mk +++ b/build.mk @@ -150,6 +150,7 @@ boost-tests := tests/tst-rename.so \ tests/tst-promise.so \ tests/tst-dlfcn.so \ tests/tst-stat.so +boost-tests += tests/tst-wait-for.so java_tests := tests/hello/Hello.class @@ -583,6 +584,7 @@ objects += core/dhcp.o objects += core/run.o objects += core/shutdown.o objects += core/version.o +objects += core/waitqueue.o include $(src)/fs/build.mk include $(src)/libc/build.mk diff --git a/core/sched.cc b/core/sched.cc index 0b17b91301555ff1762a5ca2bf16ee0109924441..d90dbabfe9b5ef1d7f5e7ace7a4634970cf69f89 100644 --- a/core/sched.cc +++ b/core/sched.cc @@ -704,12 +704,19 @@ void thread::destroy() } // Must be called under rcu_read_lock -void thread::wake_impl(detached_state* st) +// +// allowed_initial_states_mask *must* contain status::waiting, and +// *may* contain status::sending_lock (for waitqueue wait morphing). +// it will transition from one of the allowed initial states to the +// waking state. +void thread::wake_impl(detached_state* st, unsigned allowed_initial_states_mask) { status old_status = status::waiting; trace_sched_wake(st->t); - if (!st->st.compare_exchange_strong(old_status, status::waking)) { - return; + while (!st->st.compare_exchange_weak(old_status, status::waking)) { + if (!((1 << unsigned(old_status)) & allowed_initial_states_mask)) { + return; + } } auto tcpu = st->_cpu; WITH_LOCK(preempt_lock_in_rcu) { @@ -736,6 +743,24 @@ void thread::wake() } } +void thread::wake_lock(mutex* mtx, wait_record* wr) +{ + // must be called with mtx held + WITH_LOCK(rcu_read_lock) { + auto st = _detached_state.get(); + // We want to send_lock() to this thread, but we want to be sure we're the only + // ones doing it, and that it doesn't wake up while we do + auto expected = status::waiting; + if (!st->st.compare_exchange_strong(expected, status::sending_lock, std::memory_order_relaxed)) { + // let the thread acquire the lock itself + return; + } + st->lock_sent = true; + mtx->send_lock(wr); + // since we're in status::sending_lock, no one can wake us except mutex::unlock + } +} + void thread::main() { _func(); @@ -772,7 +797,7 @@ void thread::stop_wait() return; } preempt_enable(); - while (st.load() == status::waking) { + while (st.load() == status::waking || st.load() == status::sending_lock) { schedule(); } assert(st.load() == status::running); diff --git a/core/waitqueue.cc b/core/waitqueue.cc new file mode 100644 index 0000000000000000000000000000000000000000..9f4803aacab06aab17ddf70977fabd5861e59f71 --- /dev/null +++ b/core/waitqueue.cc @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + * + * This work is open source software, licensed under the terms of the + * BSD license as described in the LICENSE file in the top-level directory. + */ + +#include <osv/waitqueue.hh> +#include <osv/trace.hh> +#include <osv/wait_record.hh> + +TRACEPOINT(trace_waitqueue_wait, "%p", waitqueue *); +TRACEPOINT(trace_waitqueue_wake_one, "%p", waitqueue *); +TRACEPOINT(trace_waitqueue_wake_all, "%p", waitqueue *); + +namespace sched { + +void wait_object<waitqueue>::arm() +{ + auto& fifo = _wq._waiters_fifo; + if (!fifo.oldest) { + fifo.oldest = &_wr; + } else { + fifo.newest->next = &_wr; + } + fifo.newest = &_wr; +} + +void wait_object<waitqueue>::disarm() +{ + auto& fifo = _wq._waiters_fifo; + if (_wr.woken()) { + return; + } + // wr is still in the linked list, so remove it: + wait_record** pnext = &fifo.oldest; + wait_record* newest = nullptr; + while (*pnext) { + if (&_wr == *pnext) { + *pnext = _wr.next; + if (!*pnext || !(*pnext)->next) { + fifo.newest = newest; + } + break; + } + newest = *pnext; + pnext = &(*pnext)->next; + } +} + +} + +void waitqueue::wait(mutex& mtx) +{ + trace_waitqueue_wait(this); + sched::thread::wait_for(mtx, *this); +} + +void waitqueue::wake_one(mutex& mtx) +{ + trace_waitqueue_wake_one(this); + wait_record *wr = _waiters_fifo.oldest; + if (wr) { + _waiters_fifo.oldest = wr->next; + if (wr->next == nullptr) { + _waiters_fifo.newest = nullptr; + } + // Rather than wake the waiter here (wr->wake()) and have it wait + // again for the mutex, we do "wait morphing" - have it continue to + // sleep until the mutex becomes available. + wr->wake_lock(&mtx); + } +} + +void waitqueue::wake_all(mutex& mtx) +{ + trace_waitqueue_wake_all(this); + wait_record *wr = _waiters_fifo.oldest; + _waiters_fifo.oldest = _waiters_fifo.newest = nullptr; + while (wr) { + auto next_wr = wr->next; // need to save - *wr invalid after wake + // FIXME: splice the entire chain at once? + wr->wake_lock(&mtx); + wr = next_wr; + } +} + diff --git a/include/osv/signal.hh b/include/osv/signal.hh new file mode 100644 index 0000000000000000000000000000000000000000..f89a506c2f483c1a9fbc6dd6e902e8370daeb825 --- /dev/null +++ b/include/osv/signal.hh @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + * + * This work is open source software, licensed under the terms of the + * BSD license as described in the LICENSE file in the top-level directory. + */ + +#ifndef OSV_SIGNAL_HH_ +#define OSV_SIGNAL_HH_ + +#include <osv/error.h> +#include <sched.hh> + +class signal_catcher { +public: + signal_catcher() { sched::thread::current()->interrupted(false); } + error result() { return interrupted() ? error(EINTR) : error(); } + bool interrupted() { return sched::thread::current()->interrupted(); } + void wait() { sched::thread::wait_for(*this); } +private: + friend class sched::wait_object<signal_catcher>; +}; + +namespace sched { + +template <> +class wait_object<signal_catcher> { +public: + wait_object(signal_catcher& sc, mutex* mtx = nullptr) {} + void arm() {} + void disarm() {} + bool poll() const { return sched::thread::current()->interrupted(); } +}; + +} + +#endif /* OSV_SIGNAL_HH_ */ diff --git a/include/osv/wait_record.hh b/include/osv/wait_record.hh index 3afad69a5549b04027e0f70fa2d4b7369fb5dc69..000c3bc2dcb79787f67144292bd7069c97618930 100644 --- a/include/osv/wait_record.hh +++ b/include/osv/wait_record.hh @@ -26,14 +26,16 @@ // mechanism (e.g., see Event objects in Python and in Microsoft Windows), // except that waiter is limited to a single waiting thread. +namespace lockfree { struct mutex; } + class waiter { -private: +protected: sched::thread *t; public: explicit waiter(sched::thread *t) : t(t) { }; inline void wake() { - t->wake_with([&] { t = nullptr; }); + t->wake_with_from_mutex([&] { t = nullptr; }); } inline void wait() const { @@ -80,6 +82,8 @@ public: struct wait_record : public waiter { struct wait_record *next; explicit wait_record(sched::thread *t) : waiter(t), next(nullptr) { }; + using mutex = lockfree::mutex; + void wake_lock(mutex* mtx) { t->wake_lock(mtx, this); } }; #endif /* INCLUDED_OSV_WAIT_RECORD */ diff --git a/include/osv/waitqueue.hh b/include/osv/waitqueue.hh new file mode 100644 index 0000000000000000000000000000000000000000..799e12be0c4f8744734c698fe4a9f377cba06cfc --- /dev/null +++ b/include/osv/waitqueue.hh @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + * + * This work is open source software, licensed under the terms of the + * BSD license as described in the LICENSE file in the top-level directory. + */ + +#ifndef WAITQUEUE_HH_ +#define WAITQUEUE_HH_ + +// A waitqueue is similar to a condition variable, but relies on the +// user supplied mutex for internal locking. + +#include <sys/cdefs.h> +#include <sched.hh> +#include <osv/wait_record.hh> + +/** + * An efficient synchronization point for threads. + * + * A waitqueue is similar to a condition variable, except that + * it depends on an external mutex for its own synchronization + * (hence that mutex must be passed to wake_one() and wake_all() + * in addition to wait(). + * + * A waitqueue is suitable for use with wait_for(), so you can + * wait for a timeout or signal, or any other event, concurrently + * with the waitqueue. + */ +class waitqueue { +private: + struct { + // A FIFO queue of waiters - a linked list from oldest (next in line + // to be woken) towards newest. The wait records themselves are held + // on the stack of the waiting thread - so no dynamic memory + // allocation is needed for this list. + struct wait_record *oldest = {}; + struct wait_record *newest = {}; + } _waiters_fifo; +public: + /** + * Wait on the wait queue + * + * Wait to be woken (with wake_one() or wake_all()). + * + * It is assumed that wait() is called with the given mutex locked. + * This mutex is unlocked during the wait, and re-locked before wait() + * returns. + */ + void wait(mutex& mtx); + /** + * Wake one thread waiting on the condition variable + * + * Wake one of the threads currently waiting on the condition variable, + * or do nothing if there is no thread waiting. + * + * The thread is not awakened immediately; it will only wake after mtx + * is released. + * + * wake_one() must be called with the mutex held. + */ + void wake_one(mutex& mtx); + /** + * Wake all threads waiting on the condition variable + * + * Wake all of the threads currently waiting on the condition variable, + * or do nothing if there is no thread waiting. + * + * The threads are not awakened immediately; they will only wake after mtx + * is released (one by one). + * + * wake_all() must be called with the mutex held. + */ + void wake_all(mutex& mtx); +private: + void arm(mutex& mtx); + bool poll() const; + class waiter; + friend class sched::wait_object<waitqueue>; +}; + +namespace sched { + +template <> +class wait_object<waitqueue> { +public: + wait_object(waitqueue& wq, mutex* mtx) + : _wq(wq), _mtx(*mtx), _wr(sched::thread::current()) {} + bool poll() const { return _wr.woken(); } + void arm(); + void disarm(); +private: + waitqueue& _wq; + mutex& _mtx; + wait_record _wr; +}; + +} + +#endif /* WAITQUEUE_HH_ */ diff --git a/include/sched.hh b/include/sched.hh index a38f76e0592a7932f71a75758a00c0ae3c8dea78..cc5c0bb33afaf94c8f52cc3e3feb44126da8fc86 100644 --- a/include/sched.hh +++ b/include/sched.hh @@ -55,6 +55,7 @@ class timer; class timer_list; class cpu_mask; class thread_runtime_compare; +template <typename T> class wait_object; void schedule(); @@ -187,6 +188,20 @@ protected: class timer : public timer_base { public: explicit timer(thread& t); +private: + class waiter; + friend class wait_object<timer>; +}; + +template <> +class wait_object<timer> { +public: + explicit wait_object(timer& tmr, mutex* mtx = nullptr) : _tmr(tmr) {} + bool poll() const { return _tmr.expired(); } + void arm() {} + void disarm() {} +private: + timer& _tmr; }; // thread_runtime is used to maintain the scheduler's view of the thread's @@ -334,11 +349,30 @@ public: static void wait_until(mutex_t& mtx, Pred pred); template <class Pred> static void wait_until(mutex_t* mtx, Pred pred); + + // Wait for any of a number of waitable objects to be signalled + // waitable objects include: waitqueues, timers, predicates, + // and more. If supplied, the mutex object is unlocked while waiting. + template <typename... waitable> + static void wait_for(waitable&&... waitables); + template <typename... waitable> + static void wait_for(mutex& mtx, waitable&&... waitables); + void wake(); + // wake up after acquiring mtx + // + // mtx must be locked, and wr must be a free wait_record that will + // survive until the thread wakes up. wake_lock() will not cause + // the thread to wake up immediately; only when it becomes possible + // for it to take the mutex. + void wake_lock(mutex* mtx, wait_record* wr); bool interrupted(); void interrupted(bool f); template <class Action> inline void wake_with(Action action); + // for mutex internal use + template <class Action> + inline void wake_with_from_mutex(Action action); static void sleep_until(s64 abstime); static void yield(); static void exit() __attribute__((__noreturn__)); @@ -377,7 +411,8 @@ public: */ float priority() const; private: - static void wake_impl(detached_state* st); + static void wake_impl(detached_state* st, + unsigned allowed_initial_states_mask = 1 << unsigned(status::waiting)); void main(); void switch_to(); void switch_to_first(); @@ -388,9 +423,13 @@ private: void setup_tcb(); void free_tcb(); void complete() __attribute__((__noreturn__)); + template <class Action> + inline void do_wake_with(Action action, unsigned allowed_initial_states_mask); template <class IntrStrategy, class Mutex, class Pred> static void do_wait_until(Mutex& mtx, Pred pred); - struct dummy_lock {}; + template <typename Mutex, typename... wait_object> + static void do_wait_for(Mutex& mtx, wait_object&&... wait_objects); + struct dummy_lock { void receive_lock() {} }; friend void acquire(dummy_lock&) {} friend void release(dummy_lock&) {} friend void start_early_threads(); @@ -405,11 +444,42 @@ private: std::function<void ()> _func; thread_state _state; thread_control_block* _tcb; + + // State machine transition matrix + // + // Initial Next Async? Event Notes + // + // unstarted waiting sync start() followed by wake() + // unstarted prestarted sync start() before scheduler startup + // + // prestarted unstarted sync scheduler startup followed by start() + // + // waiting waking async wake() + // waiting running sync wait_until cancelled (predicate became true before context switch) + // waiting sending_lock async wake_lock() used for ensuring the thread does not wake + // up while we call receive_lock() + // + // sending_lock waking async mutex::unlock() + // + // running waiting sync prepare_wait() + // running queued sync context switch + // running terminating sync destroy() thread function completion + // + // queued running sync context switch + // + // waking queued async scheduler poll of incoming thread wakeup queue + // waking running sync thread pulls self out of incoming wakeup queue + // + // terminating terminated async post context switch + // + // wake() on any state except waiting is discarded. + enum class status { invalid, prestarted, unstarted, waiting, + sending_lock, running, queued, waking, // between waiting and queued @@ -424,6 +494,7 @@ private: explicit detached_state(thread* t) : t(t) {} thread* t; cpu* _cpu; + bool lock_sent = false; // send_lock() was called for us std::atomic<status> st = { status::unstarted }; }; std::unique_ptr<detached_state> _detached_state; @@ -570,6 +641,31 @@ bool preemptable() __attribute__((no_instrument_function)); thread* current(); +// wait_for() support for predicates +// + +template <typename Pred> +class predicate_wait_object { +public: + predicate_wait_object(Pred pred, mutex* mtx = nullptr) : _pred(pred) {} + void arm() {} + void disarm() {} + bool poll() { return _pred(); } +private: + Pred _pred; +}; + + +// only instantiate wait_object<Pred> if Pred is indeed a predicate +template <typename Pred> +class wait_object + : public std::enable_if<std::is_same<bool, decltype((*static_cast<Pred*>(nullptr))())>::value, + predicate_wait_object<Pred>>::type +{ + using predicate_wait_object<Pred>::predicate_wait_object; + // all contents from predicate_wait_object<> +}; + class wait_guard { public: wait_guard(thread* t) : _t(t) { t->prepare_wait(); } @@ -696,6 +792,121 @@ inline void thread::interrupted(bool f) } } +// thread::wait_for() accepts an optional mutex, followed by a +// number waitable objects. +// +// a waitable object's protocol is as follows: +// +// Given a waitable object 'wa', the class wait_object<waitable> defines +// the waiting protocol using instance methods: +// +// wait_object(wa, mtx) - initialization; if mutex is not required for waiting it can be optional +// poll() - check whether wa has finished waiting +// arm() - prepare for waiting; typically registering wa on some list +// disarm() - called after waiting is complete +// +// all of these are called with the mutex held, if supplied + + +// wait_object<T> must be specialized for the particular waitable. +template <typename T> +class wait_object; + +template <typename... wait_object> +void arm(wait_object&... objs); + +template <> +inline +void arm() +{ +} + +template <typename wait_object_first, typename... wait_object_rest> +inline +void arm(wait_object_first& first, wait_object_rest&... rest) +{ + first.arm(); + arm(rest...); +} + +template <typename... wait_object> +bool poll(wait_object&... objs); + +template <> +inline +bool poll() +{ + return false; +} + +template <typename wait_object_first, typename... wait_object_rest> +inline +bool poll(wait_object_first& first, wait_object_rest&... rest) +{ + return first.poll() || poll(rest...); +} + +template <typename... wait_object> +void disarm(wait_object&... objs); + +template <> +inline +void disarm() +{ +} + +template <typename wait_object_first, typename... wait_object_rest> +inline +void disarm(wait_object_first& first, wait_object_rest&... rest) +{ + disarm(rest...); + first.disarm(); +} + +template <typename Mutex, typename... wait_object> +inline +void thread::do_wait_for(Mutex& mtx, wait_object&&... wait_objects) +{ + if (poll(wait_objects...)) { + return; + } + arm(wait_objects...); + // must duplicate do_wait_until since gcc has a bug capturing parameter packs + thread* me = current(); + while (true) { + { + wait_guard waiter(me); + if (poll(wait_objects...)) { + break; + } + release(mtx); + me->wait(); + } + if (me->_detached_state->lock_sent) { + me->_detached_state->lock_sent = false; + mtx.receive_lock(); + } else { + acquire(mtx); + } + } + disarm(wait_objects...); +} + +template <typename... waitable> +inline +void thread::wait_for(mutex& mtx, waitable&&... waitables) +{ + do_wait_for(mtx, wait_object<typename std::remove_reference<waitable>::type>(waitables, &mtx)...); +} + +template <typename... waitable> +inline +void thread::wait_for(waitable&&... waitables) +{ + dummy_lock mtx; + do_wait_for(mtx, wait_object<typename std::remove_reference<waitable>::type>(waitables)...); +} + // About wake_with(): // // Consider one thread doing: @@ -717,15 +928,30 @@ inline void thread::interrupted(bool f) template <class Action> inline -void thread::wake_with(Action action) +void thread::do_wake_with(Action action, unsigned allowed_initial_states_mask) { WITH_LOCK(osv::rcu_read_lock) { auto ds = _detached_state.get(); action(); - wake_impl(ds); + wake_impl(ds, allowed_initial_states_mask); } } +template <class Action> +inline +void thread::wake_with(Action action) +{ + return do_wake_with(action, (1 << unsigned(status::waiting))); +} + +template <class Action> +inline +void thread::wake_with_from_mutex(Action action) +{ + return do_wake_with(action, (1 << unsigned(status::waiting)) + | (1 << unsigned(status::sending_lock))); +} + extern cpu __thread* current_cpu; inline cpu* thread::tcpu() const diff --git a/tests/tst-wait-for.cc b/tests/tst-wait-for.cc new file mode 100644 index 0000000000000000000000000000000000000000..813749f081f355149d341219af38f09912441029 --- /dev/null +++ b/tests/tst-wait-for.cc @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + * + * This work is open source software, licensed under the terms of the + * BSD license as described in the LICENSE file in the top-level directory. + */ + + +#define BOOST_TEST_MODULE tst-wait-for + +#include <boost/test/unit_test.hpp> +#include <sched.hh> +#include <osv/waitqueue.hh> +#include <drivers/clock.hh> +#include <cstdlib> + +BOOST_AUTO_TEST_CASE(test_wait_for_one_timer) +{ + auto now = clock::get()->time(); + sched::timer tmr(*sched::thread::current()); + tmr.set(now + 1_s); + sched::thread::wait_for(tmr); + auto later = clock::get()->time(); + BOOST_REQUIRE(std::abs(later - (now + 1_s)) < 20_ms); + BOOST_REQUIRE(tmr.expired()); +} + +BOOST_AUTO_TEST_CASE(test_wait_for_two_timers) +{ + auto now = clock::get()->time(); + sched::timer tmr1(*sched::thread::current()); + sched::timer tmr2(*sched::thread::current()); + tmr1.set(now + 2_s); + tmr2.set(now + 1_s); + sched::thread::wait_for(tmr1, tmr2); + BOOST_REQUIRE(!tmr1.expired() && tmr2.expired()); + tmr2.cancel(); + sched::thread::wait_for(tmr1, tmr2); + BOOST_REQUIRE(tmr1.expired() && !tmr2.expired()); +} + +BOOST_AUTO_TEST_CASE(test_waitqueue_1) +{ + waitqueue wq; + mutex mtx; + int counter = 0; + WITH_LOCK(mtx) { + sched::thread waker([&] { + WITH_LOCK(mtx) { + ++counter; + wq.wake_one(mtx); + } + }); + waker.start(); + wq.wait(mtx); + waker.join(); + } + BOOST_REQUIRE(counter == 1); +} + +BOOST_AUTO_TEST_CASE(test_waitqueue_2) +{ + waitqueue wq; + mutex mtx; + int counter = 0; + sched::timer tmr(*sched::thread::current()); + WITH_LOCK(mtx) { + tmr.set(clock::get()->time() + 500_ms); + sched::thread waker([&] { + sched::thread::sleep_until(clock::get()->time() + 1_s); + WITH_LOCK(mtx) { + ++counter; + wq.wake_one(mtx); + } + }); + waker.start(); + // timer wait + sched::thread::wait_for(mtx, wq, tmr); + BOOST_REQUIRE(tmr.expired()); + BOOST_REQUIRE(counter == 0); + // null wait + sched::thread::wait_for(mtx, wq, tmr); + BOOST_REQUIRE(tmr.expired()); + BOOST_REQUIRE(counter == 0); + // wait for wq + tmr.cancel(); + sched::thread::wait_for(mtx, wq, tmr); + BOOST_REQUIRE(counter == 1); + waker.join(); + } +} + +BOOST_AUTO_TEST_CASE(test_wait_for_predicate) +{ + std::atomic<bool> x = { false }; + auto sleeper = sched::thread::current(); + sched::thread waker([&] { + sched::thread::sleep_until(nanotime() + 1_s); + x.store(true); + sleeper->wake(); + }); + waker.start(); + // send some spurious wakeups for fun + sched::thread false_waker([&] { + for (auto i = 0; i < 100; ++i) { + sched::thread::sleep_until(nanotime() + 100_ms); + sleeper->wake(); + } + }); + sched::timer tmr(*sched::thread::current()); + tmr.set(nanotime() + 500_ms); + sched::thread::wait_for(tmr, [&] { return x.load(); }); + BOOST_REQUIRE(tmr.expired()); + BOOST_REQUIRE(!x.load()); + tmr.cancel(); + sched::thread::wait_for(tmr, [&] { return x.load(); }); + BOOST_REQUIRE(!tmr.expired()); + BOOST_REQUIRE(x.load()); + waker.join(); + false_waker.join(); +}