Newer
Older
#include "sched.hh"
#include <list>
#include "mutex.hh"
#include <mutex>
#include "debug.hh"
#include "align.hh"
#include "drivers/clock.hh"
std::vector<cpu*> cpus;
unsigned __thread preempt_counter = CONF_preempt ? 0 : 1;
inter_processor_interrupt wakeup_ipi{[] {}};
}
#include "arch-switch.hh"
namespace sched {
const thread::attr idle_thread_attr{{}, true};
cpu::cpu()
: idle_thread([this] { idle(); }, idle_thread_attr)
{
idle_thread._cpu = this;
}
with_lock(irq_lock, [this] {
void cpu::reschedule_from_interrupt(bool preempt)
{
handle_incoming_wakeups();
thread* p = thread::current();
if (p->_status == thread::status::running) {
p->_status.store(thread::status::queued);
runqueue.push_back(*p);
}
auto ni = runqueue.begin();
auto n = &*ni;
runqueue.erase(ni);
assert(n->_status.load() == thread::status::queued);
n->_status.store(thread::status::running);
if (n != thread::current()) {
if (preempt) {
asm volatile("clts");
p->_fpu.save();
}
if (preempt) {
p->_fpu.restore();
}
{
do {
// spin for a bit before halting
for (unsigned ctr = 0; ctr < 100; ++ctr) {
// FIXME: can we pull threads from loaded cpus?
handle_incoming_wakeups();
if (!runqueue.empty()) {
return;
}
}
std::unique_lock<irq_lock_type> guard(irq_lock);
handle_incoming_wakeups();
if (!runqueue.empty()) {
return;
}
guard.release();
arch::wait_for_interrupt();
handle_incoming_wakeups(); // auto releases irq_lock
} while (runqueue.empty());
}
void cpu::idle()
{
while (true) {
do_idle();
// FIXME: we don't have an idle priority class yet. so
// FIXME: yield when we're done and let the scheduler pick
// FIXME: someone else
thread::yield();
}
}
cpu_set queues_with_wakes{incoming_wakeups_mask.fetch_clear()};
for (auto i : queues_with_wakes) {
incoming_wakeup_queue q;
incoming_wakeups[i].copy_and_clear(q);
while (!q.empty()) {
t._status.store(thread::status::queued);
runqueue.push_back(t);
void cpu::init_on_cpu()
{
arch.init_on_cpu();
unsigned cpu::load()
{
return runqueue.size();
}
void cpu::load_balance()
{
timer tmr(*thread::current());
while (true) {
tmr.set(clock::get()->time() + 100000000);
thread::wait_until([&] { return tmr.expired(); });
if (runqueue.empty()) {
continue;
}
auto min = *std::min_element(cpus.begin(), cpus.end(),
[](cpu* c1, cpu* c2) { return c1->load() < c2->load(); });
if (min == this) {
continue;
}
with_lock(irq_lock, [this, min] {
auto i = std::find_if(runqueue.rbegin(), runqueue.rend(),
[](thread& t) { return !t._attr.pinned; });
if (i == runqueue.rend()) {
auto& mig = *i;
runqueue.erase(std::prev(i.base())); // i.base() returns off-by-one
// we won't race with wake(), since we're not thread::waiting
assert(mig._status.load() == thread::status::queued);
mig._status.store(thread::status::waking);
mig._cpu = min;
min->incoming_wakeups[id].push_front(mig);
min->incoming_wakeups_mask.set(id);
// FIXME: avoid if the cpu is alive and if the priority does not
// FIXME: warrant an interruption
wakeup_ipi.send(min);
void schedule(bool yield)
{
cpu::current()->schedule(yield);
}
// FIXME: drive by IPI
t->_cpu->handle_incoming_wakeups();
// FIXME: what about other cpus?
if (t->_cpu->runqueue.empty()) {
with_lock(irq_lock, [t] {
t->_cpu->runqueue.push_back(*t);
assert(t->_status.load() == status::running);
t->_status.store(status::queued);
: begin(nullptr), size(0), deleter(nullptr)
thread::stack_info::stack_info(void* _begin, size_t _size)
: begin(_begin), size(_size), deleter(nullptr)
{
auto end = align_down(begin + size, 16);
size = static_cast<char*>(end) - static_cast<char*>(begin);
}
mutex thread_list_mutex;
typedef bi::list<thread,
bi::member_hook<thread,
bi::list_member_hook<>,
&thread::_thread_list_link>
> thread_list_type;
thread_list_type thread_list;
thread::thread(std::function<void ()> func, attr attr, bool main)
with_lock(thread_list_mutex, [this] {
thread_list.push_back(*this);
with_lock(thread_list_mutex, [this] {
thread_list.erase(thread_list.iterator_to(*this));
});
if (_attr.stack.deleter) {
_attr.stack.deleter(_attr.stack.begin);
}
_cpu = current()->tcpu();
assert(_status == status::unstarted);
_status.store(status::waiting);
wake();
}
assert(_status.load() == status::running);
_status.store(status::waiting);
status old_status = status::waiting;
if (!_status.compare_exchange_strong(old_status, status::waking)) {
unsigned c = cpu::current()->id;
_cpu->incoming_wakeups[c].push_front(*this);
_cpu->incoming_wakeups_mask.set(c);
// FIXME: avoid if the cpu is alive and if the priority does not
// FIXME: warrant an interruption
if (_cpu != current()->tcpu()) {
wakeup_ipi.send(_cpu);
} else if (arch::irq_enabled() && !preempt_counter) {
_cpu->schedule();
// We'll also reschedule at the end of an interrupt if needed
}
void thread::main()
{
_func();
}
thread* thread::current()
{
return sched::s_current;
}
void thread::wait()
{
void thread::sleep_until(u64 abstime)
{
timer t(*current());
t.set(abstime);
wait_until([&] { return t.expired(); });
}
status old_status = status::waiting;
if (_status.compare_exchange_strong(old_status, status::running)) {
return;
while (_status.load() == status::waking) {
schedule(true);
}
if (_joiner) {
_joiner->wake();
}
while (true) {
schedule();
}
}
void thread::suspend_timers()
{
if (_timers_need_reload) {
return;
}
_timers_need_reload = true;
}
void thread::resume_timers()
{
if (!_timers_need_reload) {
return;
}
_timers_need_reload = false;
void thread::join()
{
_joiner = current();
wait_until([this] { return _status.load() == status::terminated; });
thread::stack_info thread::get_stack_info()
{
unsigned long thread::id()
{
return _id;
}
void preempt_disable()
{
++preempt_counter;
}
void preempt_enable()
{
--preempt_counter;
// FIXME: may need to schedule() here if a high prio thread is waiting
}
sched::cpu::current()->reschedule_from_interrupt(true);
timer_list::callback_dispatch::callback_dispatch()
{
clock_event->set_callback(this);
}
void timer_list::fired()
{
auto now = clock::get()->time();
auto i = _list.begin();
while (i != _list.end() && i->_time < now) {
auto j = i++;
_list.erase(j);
}
if (!_list.empty()) {
clock_event->set(_list.begin()->_time);
}
// call with irq disabled
void timer_list::suspend(bi::list<timer>& timers)
{
for (auto& t : timers) {
assert(!t._expired);
_list.erase(_list.iterator_to(t));
}
}
// call with irq disabled
void timer_list::resume(bi::list<timer>& timers)
{
bool rearm = false;
for (auto& t : timers) {
assert(!t._expired);
auto i = _list.insert(t).first;
rearm |= i == _list.begin();
}
if (rearm) {
clock_event->set(_list.begin()->_time);
}
}
void timer_list::callback_dispatch::fired()
{
cpu::current()->timers.fired();
}
timer_list::callback_dispatch timer_list::_dispatch;
timer::timer(thread& t)
: _t(t)
, _expired()
{
}
timer::~timer()
{
cancel();
}
void timer::expire()
{
_expired = true;
_t._active_timers.erase(_t._active_timers.iterator_to(*this));
_t.wake();
}
_t._active_timers.push_back(*this);
if (timers._list.iterator_to(*this) == timers._list.begin()) {
clock_event->set(time);
}
});
_t._active_timers.erase(_t._active_timers.iterator_to(*this));
_t._cpu->timers._list.erase(_t._cpu->timers._list.iterator_to(*this));
// even if we remove the first timer, allow it to expire rather than
// reprogramming the timer
}
bool timer::expired() const
{
return _expired;
}
bool operator<(const timer& t1, const timer& t2)
{
if (t1._time < t2._time) {
return true;
} else if (t1._time == t2._time) {
return &t1 < &t2;
} else {
return false;
}
}
class detached_thread::reaper {
public:
reaper();
void reap();
void add_zombie(detached_thread* z);
private:
mutex _mtx;
std::list<detached_thread*> _zombies;
thread _thread;
};
detached_thread::reaper::reaper()
: _mtx{}, _zombies{}, _thread([=] { reap(); })
{
}
void detached_thread::reaper::reap()
{
while (true) {
with_lock(_mtx, [=] {
wait_until(_mtx, [=] { return !_zombies.empty(); });
while (!_zombies.empty()) {
auto z = _zombies.front();
_zombies.pop_front();
z->join();
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
}
});
}
}
void detached_thread::reaper::add_zombie(detached_thread* z)
{
with_lock(_mtx, [=] {
_zombies.push_back(z);
_thread.wake();
});
}
detached_thread::detached_thread(std::function<void ()> f)
: thread([=] { f(); _s_reaper->add_zombie(this); })
{
}
detached_thread::~detached_thread()
{
}
detached_thread::reaper *detached_thread::_s_reaper;
void init_detached_threads_reaper()
{
detached_thread::_s_reaper = new detached_thread::reaper;
}
void init(elf::tls_data tls_data, std::function<void ()> cont)
thread::attr attr;
attr.stack = { new char[4096*10], 4096*10 };
thread t{cont, attr, true};
t._cpu = smp_initial_find_current_cpu();