Skip to content
Snippets Groups Projects
Commit bca7bbbe authored by Pawel Dziepak's avatar Pawel Dziepak Committed by Pekka Enberg
Browse files

sched: reimplement incoming_wakeup_queue


This patch implements lockfree_queue (which is used as incoming_wakeup_queue)
so that it doesn't need exchange or compare_exchange operations.

The idea is to use a linked list but interleave actual objects stored in the
queue with helper object (lockless_queue_helper) which are just pointer to the
next element. Each object in the queue owns the helper that precedes it (and
they are dequeued together) while the last helper, which does not precede any
object is owned by the queue itself.

When a new object is enqueued it gains ownership of the last helper in the
queue in exchange of the helper it owned before which now becomes the new
tail of the list.

Unlike the original implementation this version of lockfree_queue really
requires that there is no more than one concurrent producer and no more than
one concurrent consumer.

The results oftests/misc-ctxs on my test machine are as follows (the values
are medians of five runs):

before:
colocated: 332 ns
apart: 590 ns

after:
colocated: 313 ns
apart: 558 ns

Reviewed-by: default avatarNadav Har'El <nyh@cloudius-systems.com>
Signed-off-by: default avatarPawel Dziepak <pdziepak@quarnos.org>
Signed-off-by: default avatarPekka Enberg <penberg@cloudius-systems.com>
parent e5ef01e3
No related branches found
No related tags found
No related merge requests found
......@@ -381,13 +381,12 @@ void cpu::handle_incoming_wakeups()
return;
}
for (auto i : queues_with_wakes) {
incoming_wakeup_queue q;
incoming_wakeups[i].copy_and_clear(q);
while (!q.empty()) {
auto& t = q.front();
q.pop_front_nonatomic();
irq_save_lock_type irq_lock;
WITH_LOCK(irq_lock) {
irq_save_lock_type irq_lock;
WITH_LOCK(irq_lock) {
auto& q = incoming_wakeups[i];
while (!q.empty()) {
auto& t = q.front();
q.pop_front();
if (&t == thread::current()) {
// Special case of current thread being woken before
// having a chance to be scheduled out.
......@@ -463,7 +462,7 @@ void cpu::load_balance()
mig._runtime.export_runtime();
mig.remote_thread_local_var(::percpu_base) = min->percpu_base;
mig.remote_thread_local_var(current_cpu) = min;
min->incoming_wakeups[id].push_front(mig);
min->incoming_wakeups[id].push_back(mig);
min->incoming_wakeups_mask.set(id);
// FIXME: avoid if the cpu is alive and if the priority does not
// FIXME: warrant an interruption
......@@ -762,7 +761,10 @@ void thread::wake_impl(detached_state* st, unsigned allowed_initial_states_mask)
unsigned c = cpu::current()->id;
// we can now use st->t here, since the thread cannot terminate while
// it's waking, but not afterwards, when it may be running
tcpu->incoming_wakeups[c].push_front(*st->t);
irq_save_lock_type irq_lock;
WITH_LOCK(irq_lock) {
tcpu->incoming_wakeups[c].push_back(*st->t);
}
if (!tcpu->incoming_wakeups_mask.test_all_and_set(c)) {
// FIXME: avoid if the cpu is alive and if the priority does not
// FIXME: warrant an interruption
......
......@@ -10,6 +10,9 @@
#include <atomic>
#include <arch.hh>
template <class T> struct lockless_queue_helper;
template <class T> class lockless_queue_link;
template <class T, lockless_queue_link<T> T::*link> class lockless_queue;
......@@ -19,62 +22,86 @@ template <class T, lockless_queue_link<T> T::*link>
class lockless_queue {
public:
lockless_queue();
~lockless_queue();
bool empty() const;
void copy_and_clear(lockless_queue& to);
void push_front(T& elem);
void push_back(T& elem);
T& front();
void pop_front_nonatomic();
void pop_front();
private:
std::atomic<T*> _head;
lockless_queue_helper<T>* _head CACHELINE_ALIGNED;
lockless_queue_helper<T>* _tail CACHELINE_ALIGNED;
};
template <class T>
struct lockless_queue_helper {
std::atomic<T*> _next;
};
template <class T>
class lockless_queue_link
{
public:
T* _next;
lockless_queue_link();
~lockless_queue_link();
lockless_queue_helper<T>* _helper;
lockless_queue_helper<T>* _next;
};
template <class T, lockless_queue_link<T> T::*link>
lockless_queue<T, link>::lockless_queue()
: _head(nullptr)
template <class T>
lockless_queue_link<T>::lockless_queue_link()
: _helper(new lockless_queue_helper<T>)
{
}
template <class T>
lockless_queue_link<T>::~lockless_queue_link()
{
delete _helper;
}
template <class T, lockless_queue_link<T> T::*link>
void lockless_queue<T, link>::copy_and_clear(lockless_queue& to)
lockless_queue<T, link>::lockless_queue()
: _head(new lockless_queue_helper<T>)
, _tail(_head)
{
to._head = _head.exchange(nullptr);
_head->_next.store(nullptr, std::memory_order_relaxed);
}
template <class T, lockless_queue_link<T> T::*link>
void lockless_queue<T, link>::push_front(T& elem)
lockless_queue<T, link>::~lockless_queue()
{
// no ABA, since we are sole producer
T* old;
do {
old = _head.load();
(elem.*link)._next = old;
} while (!_head.compare_exchange_weak(old, &elem));
delete _tail;
}
template <class T, lockless_queue_link<T> T::*link>
bool lockless_queue<T, link>::empty() const
{
return !_head.load();
return !_head->_next.load(std::memory_order_relaxed);
}
template <class T, lockless_queue_link<T> T::*link>
void lockless_queue<T, link>::push_back(T& elem)
{
lockless_queue_helper<T>* helper = (elem.*link)._helper;
(elem.*link)._next = helper;
(elem.*link)._helper = _tail;
helper->_next.store(nullptr, std::memory_order_relaxed);
_tail->_next.store(&elem, std::memory_order_release);
_tail = helper;
}
template <class T, lockless_queue_link<T> T::*link>
T& lockless_queue<T, link>::front()
{
return *_head.load();
return *_head->_next.load(std::memory_order_consume);
}
template <class T, lockless_queue_link<T> T::*link>
void lockless_queue<T, link>::pop_front_nonatomic()
void lockless_queue<T, link>::pop_front()
{
_head = (_head.load()->*link)._next;
auto elem = _head->_next.load(std::memory_order_consume);
_head = (elem->*link)._next;
}
#endif /* LOCKLESS_QUEUE_HH_ */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment