Newer
Older
#include "sched.hh"
#include <list>
#include "mutex.hh"
#include <mutex>
#include "debug.hh"
#include "align.hh"
#include "drivers/clock.hh"
std::vector<cpu*> cpus;
thread __thread * s_current;
elf::tls_data tls;
// currently the scheduler will poll right after an interrupt, so no
// need to do anything.
inter_processor_interrupt wakeup_ipi{[] {}};
}
#include "arch-switch.hh"
namespace sched {
if (p->_status.load() == thread::status::running && !yield) {
with_lock(irq_lock, [this] {
auto n = &runqueue.front();
assert(n->_status.load() == thread::status::queued
|| n->_status.load() == thread::status::running);
n->_status.store(thread::status::running);
if (n != thread::current()) {
n->switch_to();
}
void cpu::idle()
{
do {
// spin for a bit before halting
for (unsigned ctr = 0; ctr < 100; ++ctr) {
// FIXME: can we pull threads from loaded cpus?
handle_incoming_wakeups();
if (!runqueue.empty()) {
return;
}
}
std::unique_lock<irq_lock_type> guard(irq_lock);
handle_incoming_wakeups();
if (!runqueue.empty()) {
return;
}
guard.release();
arch::wait_for_interrupt();
handle_incoming_wakeups(); // auto releases irq_lock
} while (runqueue.empty());
}
cpu_set queues_with_wakes{incoming_wakeups_mask.fetch_clear()};
for (auto i : queues_with_wakes) {
incoming_wakeup_queue q;
incoming_wakeups[i].copy_and_clear(q);
while (!q.empty()) {
with_lock(irq_lock, [&] {
t.resume_timers();
});
void cpu::init_on_cpu()
{
arch.init_on_cpu();
unsigned cpu::load()
{
return runqueue.size();
}
void cpu::load_balance()
{
timer tmr(*thread::current());
while (true) {
tmr.set(clock::get()->time() + 100000000);
thread::wait_until([&] { return tmr.expired(); });
if (runqueue.empty()) {
continue;
}
auto min = *std::min_element(cpus.begin(), cpus.end(),
[](cpu* c1, cpu* c2) { return c1->load() < c2->load(); });
if (min == this) {
continue;
}
with_lock(irq_lock, [this, min] {
auto i = std::find_if(runqueue.rbegin(), runqueue.rend(),
[](thread& t) { return !t._attr.pinned; });
if (i == runqueue.rend()) {
auto& mig = *i;
runqueue.erase(std::prev(i.base())); // i.base() returns off-by-one
// we won't race with wake(), since we're not thread::waiting
assert(mig._status.load() == thread::status::queued);
mig._status.store(thread::status::waking);
mig._cpu = min;
min->incoming_wakeups[id].push_front(mig);
min->incoming_wakeups_mask.set(id);
// FIXME: avoid if the cpu is alive and if the priority does not
// FIXME: warrant an interruption
wakeup_ipi.send(min);
void schedule(bool yield)
{
cpu::current()->schedule(yield);
}
// FIXME: drive by IPI
t->_cpu->handle_incoming_wakeups();
// FIXME: what about other cpus?
if (t->_cpu->runqueue.empty()) {
with_lock(irq_lock, [t] {
t->_cpu->runqueue.push_back(*t);
});
assert(t->_status.load() == status::running);
t->_status.store(status::queued);
: begin(nullptr), size(0), owned(true)
thread::stack_info::stack_info(void* _begin, size_t _size)
: begin(_begin), size(_size), owned(false)
{
auto end = align_down(begin + size, 16);
size = static_cast<char*>(end) - static_cast<char*>(begin);
}
mutex thread_list_mutex;
typedef bi::list<thread,
bi::member_hook<thread,
bi::list_member_hook<>,
&thread::_thread_list_link>
> thread_list_type;
thread_list_type thread_list;
thread::thread(std::function<void ()> func, attr attr, bool main)
with_lock(thread_list_mutex, [this] {
thread_list.push_back(*this);
_cpu = current()->tcpu(); // inherit creator's cpu
with_lock(irq_lock, [this] {
_cpu->runqueue.push_back(*this);
});
with_lock(thread_list_mutex, [this] {
thread_list.erase(thread_list.iterator_to(*this));
});
debug("thread dtor");
}
void thread::prepare_wait()
{
assert(_status.load() == status::running);
_status.store(status::waiting);
status old_status = status::waiting;
if (!_status.compare_exchange_strong(old_status, status::waking)) {
unsigned c = cpu::current()->id;
_cpu->incoming_wakeups[c].push_front(*this);
_cpu->incoming_wakeups_mask.set(c);
// FIXME: avoid if the cpu is alive and if the priority does not
// FIXME: warrant an interruption
if (_cpu != current()->tcpu()) {
wakeup_ipi.send(_cpu);
}
}
void thread::main()
{
_func();
}
thread* thread::current()
{
return sched::s_current;
}
void thread::wait()
{
void thread::sleep_until(u64 abstime)
{
timer t(*current());
t.set(abstime);
wait_until([&] { return t.expired(); });
}
status old_status = status::waiting;
if (_status.compare_exchange_strong(old_status, status::running)) {
return;
while (_status.load() == status::waking) {
schedule(true);
}
assert(_status.load() == status::running);
if (_joiner) {
_joiner->wake();
}
while (true) {
schedule();
}
}
void thread::suspend_timers()
{
if (_timers_need_reload) {
return;
}
_timers_need_reload = true;
}
void thread::resume_timers()
{
if (!_timers_need_reload) {
return;
}
_timers_need_reload = false;
void thread::join()
{
_joiner = current();
wait_until([this] { return _status.load() == status::terminated; });
thread::stack_info thread::get_stack_info()
{
unsigned long thread::id()
{
return _id;
}
timer_list::callback_dispatch::callback_dispatch()
{
clock_event->set_callback(this);
}
void timer_list::fired()
{
auto now = clock::get()->time();
auto i = _list.begin();
while (i != _list.end() && i->_time < now) {
auto j = i++;
_list.erase(j);
}
if (!_list.empty()) {
clock_event->set(_list.begin()->_time);
}
// call with irq disabled
void timer_list::suspend(bi::list<timer>& timers)
{
for (auto& t : timers) {
assert(!t._expired);
_list.erase(_list.iterator_to(t));
}
}
// call with irq disabled
void timer_list::resume(bi::list<timer>& timers)
{
bool rearm = false;
for (auto& t : timers) {
assert(!t._expired);
auto i = _list.insert(t).first;
rearm |= i == _list.begin();
}
if (rearm) {
clock_event->set(_list.begin()->_time);
}
}
void timer_list::callback_dispatch::fired()
{
cpu::current()->timers.fired();
}
timer_list::callback_dispatch timer_list::_dispatch;
timer::timer(thread& t)
: _t(t)
, _expired()
{
}
timer::~timer()
{
cancel();
}
void timer::expire()
{
_expired = true;
_t._active_timers.erase(_t._active_timers.iterator_to(*this));
_t.wake();
}
_t._active_timers.push_back(*this);
if (timers._list.iterator_to(*this) == timers._list.begin()) {
clock_event->set(time);
}
});
_t._active_timers.erase(_t._active_timers.iterator_to(*this));
_t._cpu->timers._list.erase(_t._cpu->timers._list.iterator_to(*this));
// even if we remove the first timer, allow it to expire rather than
// reprogramming the timer
}
bool timer::expired() const
{
return _expired;
}
bool operator<(const timer& t1, const timer& t2)
{
if (t1._time < t2._time) {
return true;
} else if (t1._time == t2._time) {
return &t1 < &t2;
} else {
return false;
}
}
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class detached_thread::reaper {
public:
reaper();
void reap();
void add_zombie(detached_thread* z);
private:
mutex _mtx;
std::list<detached_thread*> _zombies;
thread _thread;
};
detached_thread::reaper::reaper()
: _mtx{}, _zombies{}, _thread([=] { reap(); })
{
}
void detached_thread::reaper::reap()
{
while (true) {
wait_until([=] { return !_zombies.empty(); }); // FIXME: locking?
with_lock(_mtx, [=] {
while (!_zombies.empty()) {
auto z = _zombies.front();
_zombies.pop_front();
z->join();
}
});
}
}
void detached_thread::reaper::add_zombie(detached_thread* z)
{
with_lock(_mtx, [=] {
_zombies.push_back(z);
_thread.wake();
});
}
detached_thread::detached_thread(std::function<void ()> f)
: thread([=] { f(); _s_reaper->add_zombie(this); })
{
}
detached_thread::~detached_thread()
{
}
detached_thread::reaper *detached_thread::_s_reaper;
void init_detached_threads_reaper()
{
detached_thread::_s_reaper = new detached_thread::reaper;
}
void init(elf::tls_data tls_data, std::function<void ()> cont)
thread::attr attr;
attr.stack = { new char[4096*10], 4096*10 };
thread t{cont, attr, true};