From bca7bbbeba2493d4e427bce36b36e3f5db574412 Mon Sep 17 00:00:00 2001
From: Pawel Dziepak <pdziepak@quarnos.org>
Date: Tue, 13 May 2014 17:21:55 +0200
Subject: [PATCH] sched: reimplement incoming_wakeup_queue

This patch implements lockfree_queue (which is used as incoming_wakeup_queue)
so that it doesn't need exchange or compare_exchange operations.

The idea is to use a linked list but interleave actual objects stored in the
queue with helper object (lockless_queue_helper) which are just pointer to the
next element. Each object in the queue owns the helper that precedes it (and
they are dequeued together) while the last helper, which does not precede any
object is owned by the queue itself.

When a new object is enqueued it gains ownership of the last helper in the
queue in exchange of the helper it owned before which now becomes the new
tail of the list.

Unlike the original implementation this version of lockfree_queue really
requires that there is no more than one concurrent producer and no more than
one concurrent consumer.

The results oftests/misc-ctxs on my test machine are as follows (the values
are medians of five runs):

before:
colocated: 332 ns
apart: 590 ns

after:
colocated: 313 ns
apart: 558 ns

Reviewed-by: Nadav Har'El <nyh@cloudius-systems.com>
Signed-off-by: Pawel Dziepak <pdziepak@quarnos.org>
Signed-off-by: Pekka Enberg <penberg@cloudius-systems.com>
---
 core/sched.cc                 | 20 +++++-----
 include/osv/lockless-queue.hh | 69 ++++++++++++++++++++++++-----------
 2 files changed, 59 insertions(+), 30 deletions(-)

diff --git a/core/sched.cc b/core/sched.cc
index 35b4174cb..396efebfd 100644
--- a/core/sched.cc
+++ b/core/sched.cc
@@ -381,13 +381,12 @@ void cpu::handle_incoming_wakeups()
         return;
     }
     for (auto i : queues_with_wakes) {
-        incoming_wakeup_queue q;
-        incoming_wakeups[i].copy_and_clear(q);
-        while (!q.empty()) {
-            auto& t = q.front();
-            q.pop_front_nonatomic();
-            irq_save_lock_type irq_lock;
-            WITH_LOCK(irq_lock) {
+        irq_save_lock_type irq_lock;
+        WITH_LOCK(irq_lock) {
+            auto& q = incoming_wakeups[i];
+            while (!q.empty()) {
+                auto& t = q.front();
+                q.pop_front();
                 if (&t == thread::current()) {
                     // Special case of current thread being woken before
                     // having a chance to be scheduled out.
@@ -463,7 +462,7 @@ void cpu::load_balance()
             mig._runtime.export_runtime();
             mig.remote_thread_local_var(::percpu_base) = min->percpu_base;
             mig.remote_thread_local_var(current_cpu) = min;
-            min->incoming_wakeups[id].push_front(mig);
+            min->incoming_wakeups[id].push_back(mig);
             min->incoming_wakeups_mask.set(id);
             // FIXME: avoid if the cpu is alive and if the priority does not
             // FIXME: warrant an interruption
@@ -762,7 +761,10 @@ void thread::wake_impl(detached_state* st, unsigned allowed_initial_states_mask)
         unsigned c = cpu::current()->id;
         // we can now use st->t here, since the thread cannot terminate while
         // it's waking, but not afterwards, when it may be running
-        tcpu->incoming_wakeups[c].push_front(*st->t);
+        irq_save_lock_type irq_lock;
+        WITH_LOCK(irq_lock) {
+            tcpu->incoming_wakeups[c].push_back(*st->t);
+        }
         if (!tcpu->incoming_wakeups_mask.test_all_and_set(c)) {
             // FIXME: avoid if the cpu is alive and if the priority does not
             // FIXME: warrant an interruption
diff --git a/include/osv/lockless-queue.hh b/include/osv/lockless-queue.hh
index 4f01d96c0..140b53d5a 100644
--- a/include/osv/lockless-queue.hh
+++ b/include/osv/lockless-queue.hh
@@ -10,6 +10,9 @@
 
 #include <atomic>
 
+#include <arch.hh>
+
+template <class T> struct lockless_queue_helper;
 template <class T> class lockless_queue_link;
 template <class T, lockless_queue_link<T> T::*link> class lockless_queue;
 
@@ -19,62 +22,86 @@ template <class T, lockless_queue_link<T> T::*link>
 class lockless_queue {
 public:
     lockless_queue();
+    ~lockless_queue();
     bool empty() const;
-    void copy_and_clear(lockless_queue& to);
-    void push_front(T& elem);
+    void push_back(T& elem);
     T& front();
-    void pop_front_nonatomic();
+    void pop_front();
 private:
-    std::atomic<T*> _head;
+    lockless_queue_helper<T>* _head CACHELINE_ALIGNED;
+    lockless_queue_helper<T>* _tail CACHELINE_ALIGNED;
+};
+
+template <class T>
+struct lockless_queue_helper {
+    std::atomic<T*> _next;
 };
 
 template <class T>
 class lockless_queue_link
 {
 public:
-    T* _next;
+    lockless_queue_link();
+    ~lockless_queue_link();
+
+    lockless_queue_helper<T>* _helper;
+    lockless_queue_helper<T>* _next;
 };
 
-template <class T, lockless_queue_link<T> T::*link>
-lockless_queue<T, link>::lockless_queue()
-    : _head(nullptr)
+template <class T>
+lockless_queue_link<T>::lockless_queue_link()
+    : _helper(new lockless_queue_helper<T>)
 {
+}
 
+template <class T>
+lockless_queue_link<T>::~lockless_queue_link()
+{
+    delete _helper;
 }
 
 template <class T, lockless_queue_link<T> T::*link>
-void lockless_queue<T, link>::copy_and_clear(lockless_queue& to)
+lockless_queue<T, link>::lockless_queue()
+    : _head(new lockless_queue_helper<T>)
+    , _tail(_head)
 {
-    to._head = _head.exchange(nullptr);
+    _head->_next.store(nullptr, std::memory_order_relaxed);
 }
 
 template <class T, lockless_queue_link<T> T::*link>
-void lockless_queue<T, link>::push_front(T& elem)
+lockless_queue<T, link>::~lockless_queue()
 {
-    // no ABA, since we are sole producer
-    T* old;
-    do {
-        old = _head.load();
-        (elem.*link)._next = old;
-    } while (!_head.compare_exchange_weak(old, &elem));
+    delete _tail;
 }
 
 template <class T, lockless_queue_link<T> T::*link>
 bool lockless_queue<T, link>::empty() const
 {
-    return !_head.load();
+    return !_head->_next.load(std::memory_order_relaxed);
+}
+
+template <class T, lockless_queue_link<T> T::*link>
+void lockless_queue<T, link>::push_back(T& elem)
+{
+    lockless_queue_helper<T>* helper = (elem.*link)._helper;
+    (elem.*link)._next = helper;
+    (elem.*link)._helper = _tail;
+    helper->_next.store(nullptr, std::memory_order_relaxed);
+    _tail->_next.store(&elem, std::memory_order_release);
+    _tail = helper;
 }
 
 template <class T, lockless_queue_link<T> T::*link>
 T& lockless_queue<T, link>::front()
 {
-    return *_head.load();
+    return *_head->_next.load(std::memory_order_consume);
 }
 
 template <class T, lockless_queue_link<T> T::*link>
-void lockless_queue<T, link>::pop_front_nonatomic()
+void lockless_queue<T, link>::pop_front()
 {
-    _head = (_head.load()->*link)._next;
+    auto elem = _head->_next.load(std::memory_order_consume);
+    _head = (elem->*link)._next;
 }
 
 #endif /* LOCKLESS_QUEUE_HH_ */
-- 
GitLab