Skip to content
Snippets Groups Projects
Commit 122fc52c authored by Nadav Har'El's avatar Nadav Har'El
Browse files

Clean up include/lockfree/mutex.hh, removing unhelpful comments (read the

paper...) but leaving necessary discussion not present in the paper.
And also fix various errors.
parent 90aa9da8
No related branches found
No related tags found
No related merge requests found
#ifndef MUTEX_HH
#define MUTEX_HH
#include <atomic>
#include <sched.hh>
#include <lockfree/queue-mpsc.hh>
// A lock-free mutex implementation, based on the combination of two basic
// techniques:
// 1. Our lock-free multi-producer single-consumer queue technique
// (see lockfree/queue-mpsc.hh)
// 2. The "responsibility hand-off" protocol described in the 2007 paper
// 2. The "responsibility hand-off" (RHO) protocol described in the 2007 paper
// "Blocking without Locking or LFTHREADS: A lock-free thread library"
// by Anders Gidenstam and Marina Papatriantafilou.
//
// The operation and correctness of the RHO protocol is discussed in the
// aforementioned G&P 2007 paper, so we will avoid lengthy comments about it
// below, except where we differ from G&P.
//
// One especially important issue that we do need to justify is:
// Our lockfree queue implementation assumes that there cannot be two
// concurrent pop()s. We claim that this is true in the RHO protocol because:
// 1. We have pop() calls at two places:
// (A) In unlock(), after decrementing count and outside a handoff (=null)
// (B) in lock(), after picking up a handoff.
// 2. We can't have two threads at (A) at the same time, because one thread
// at (A) means another thread thread was just in lock() (because count>0),
// but currently running lock()s cannot complete (and get to unlock and A)
// until somebody will wake them it (and this is what we're trying to show
// is impossible), and news lock()s will likewise wait because the waiting
// lock() is keeping count>0.
// 3. While one lock() is at (B), we cannot have another thread at (A) or (B):
// This is because in (B) we only pop() after picking a handoff, so other
// lock()s cannot reach (B) (the did not pick the handoff, we did), and
// unlock cannot be at (A) because it only reaches (A) before making the
// handoff of after taking it back - and we know it didn't because we took
// the handoff.
//
// Another difference from our implementation from G&P is the content of the
// handoff token. G&P use the processor ID, but remark that it is not enough
// because of the ABA problem (it is possible while a CPU running lock() is
// paused, another one finishes unlock(), and then succeeds in another lock()
// and then comes a different unlock() with its unrelated handoff) and suggest
// to add a per-processor sequence number. Instead, we just used a per-mutex
// sequence number. As long as one CPU does not pause for a long enough
// duration for our (currently 32-bit) sequence number to wrap, we won't have
// a problem. A per-mutex sequence number is slower than a per-cpu one, but
// I doubt this will make a practical difference.
#include <atomic>
#include <sched.hh>
#include <lockfree/queue-mpsc.hh>
namespace lockfree {
class mutex {
private:
// counts the number of threads holding the lock or waiting for it.
std::atomic<int> count;
queue_mpsc<sched::thread *> waitqueue;
std::atomic<unsigned int> handoff;
unsigned int sequence;
// "owner" and "depth" are need for implementing a recursive mutex
unsigned int depth;
std::atomic<sched::thread *> owner;
//std::atomic_int active; // NYH try for resolving race condition
queue_mpsc<sched::thread *> waitqueue;
// responsiblity hand-off protocol (see comments below)
std::atomic<sched::thread *> handoff;
std::atomic<unsigned int> token; // TODO: are 4 bytes enough for the sensible duration of cpu pauses?
unsigned int depth;
public:
mutex() : count(0), depth(0), owner(nullptr), handoff(nullptr), token(0) { }
mutex() : count(0), depth(0), owner(nullptr), handoff(nullptr), sequence(0) { }
~mutex() { assert(count==0); }
void lock()
......@@ -47,88 +79,32 @@ public:
// a recursive mutex so it's possible the lock holder is us - in which
// case we don't need to increment depth instead of waiting.
if (owner.load() == current) {
--count; // undo the increment of count (but still leaving it > 0)
--count;
++depth;
return;
}
// If we're here still here the lock is owned by a different thread,
// (or we're racing with another lock() which is likely to take the
// lock). Put this thread in a lock-free waiting queue, so it will
// eventually be woken when another thread releases the lock.
// NOTE: the linked-list item "waiter" is allocated on the stack and
// then used on the wait queue's linked list, but we only exit this
// function (making waiter invalid) when we know that waiter was
// removed from the wait list.
// If we're here still here the lock is owned by a different thread.
// Put this thread in a waiting queue, so it will eventually be woken
// when another thread releases the lock.
sched::wait_guard wait_guard(current); // mark the thread "waiting" now.
linked_item<sched::thread *> waiter(current);
// Set current thread to "waiting" state before putting it in the wait
// queue, so that wake() is allowed on it. After doing prepare_wait()
// do not return from this function without calling current->wait()!
current->prepare_wait();
waitqueue.push(&waiter);
sched::thread *old_handoff = handoff.load();
// In the "Responsibility Hand-Off protocol" (G&P, 2007) the unlock()
// offers a lock() the responsibility of of the mutex (namely, to wake
// some waiter up), in three steps (listed in increasing time):
// 1. unlock() offers the lock by setting "handoff" to some
// non-null token.
// 2. a lock() verifies that it has anybody to wake (the queue is
// not empty). Often it will be itself.
// 3. the lock() now atomically verifies that the same handoff is
// still in progress as it was during the queue check), and if
// it is takes the handoff (makes it null).
// The "Responsibility Hand-Off" protocol where a lock() picks from
// a concurrent unlock() the responsibility of waking somebody up:
auto old_handoff = handoff.load();
if (old_handoff) {
// TODO: clean up or remove this explanation
// "Handoff" protocol (inspired by Gidenstam & Papatriantafilou, 2007):
// A concurrent unlock() wants to wake up a thread waiting on this
// mutex, but couldn't find us because it checked the wait queue before
// we pushed ourselves. The unlocker can't busy-wait for a thread to
// appear on the wait queue as this can cause it to lock up if a lock()
// attempt is paused. Instead it flags a "handoff", and one of the
// waiting threads may pick up the handoff (by atomically zeroing
// it) and wake somebody (itself or some other waiter that turned up).
//
// They key point is that after signaling the handoff, the unlocker
// does not need to busy-wait - if it knows (from "count") there is
// a concurrent locker, but sees an empty waitqueue, then it knows
// the locker is before the waitqueue.push() call above, so it will
// surely see the handoff flag later.
if (!waitqueue.isempty()){
// Typically waitqueue isn't empty because this thread is on
// it, but it is also possible that some past unlock() already
// found this thread on the wait list and woke it.
if (std::atomic_compare_exchange_strong(&handoff, &old_handoff, (sched::thread *)nullptr)) {
// We picked up the handoff, so now it is our duty to wake
// some waiting thread (could be this thread, or another
// thread). At this point (count>0, handoff=null) so the
// mutex is considered locked by us so if this cpu pauses now
// nothing bad happens except this thread keeping the lock.
//
// Above, we checked waitqueue was not empty, and
// since then nobody else had the opportunity to pop() it
// because:
// 1. unlock() only calls pop() outside the handoff attempt
// (while handoff=null) but the CAS above verified that
// the handoff attempt did not complete (if it did, the
// token would change).
// 2. another lock() only calls pop() after taking the
// handoff, and here we took the handoff (and there
// cannot be another unlock, and another handoff,
// until we wake some lock() attempt)
// The fact nobody else pop()ed is important for two
// reasons - 1. we know the queue is still not empty,
// and 2. the following pop() cannot be concurrent
// with another pop() (which our queue implementation
// does not support).
sched::thread *other_thread = waitqueue.pop();
assert(other_thread);
if (!waitqueue.isempty()){
if (std::atomic_compare_exchange_strong(&handoff, &old_handoff, 0U)) {
// Note the explanation above about no concurrent pop()s also
// explains why we can be sure waitqueue is still not empty.
auto thread = waitqueue.pop();
assert(thread);
assert(depth==0);
depth = 1;
owner.store(other_thread);
other_thread->wake();
owner.store(thread);
thread->wake();
}
}
}
......@@ -141,125 +117,89 @@ public:
current->wait(); // reschedule
// TODO: can spurious wakes happen? Maybe this loop isn't needed.
}
// TODO: do we need to call current->stop_wait() or at least preempt_enabled??
}
bool try_lock(){
sched::thread *current = sched::thread::current();
int zero = 0;
if (std::atomic_compare_exchange_strong(&count, &zero, 1)) {
// Uncontended case (no other thread is holding the lock, and no
// concurrent lock() attempts). We got the lock.
// Uncontended case. We got the lock.
owner.store(current);
assert(depth==0);
depth = 1;
return true;
}
// If we're here the mutex was already locked, but we're implementing
// a recursive mutex so it's possible the lock holder is us - in which
// case we don't need to increment depth instead of waiting.
// We're implementing a recursive mutex -lock may still succeed if
// this thread is the one holding it.
if (owner.load() == current) {
--count; // undo the increment of count (but still leaving it > 0)
++depth;
return true;
}
// If we're here still here the lock is owned by a different thread,
// or we're racing with another lock() which is likely to take the
// lock. We're almost ready to give up (return false), but the last
// chance is if we can accept a handoff - and if we do, we got the lock.
sched::thread *old_handoff = handoff.load();
if(!old_handoff)
return false;
if (std::atomic_compare_exchange_strong(&handoff, &old_handoff, (sched::thread *)nullptr)) {
// we got the lock!
// The lock is taken, and we're almost ready to give up (return
// false), but the last chance is if we can accept a handoff - and if
// we do, we got the lock.
auto old_handoff = handoff.load();
if(!old_handoff &&
std::atomic_compare_exchange_strong(&handoff, &old_handoff, 0U)) {
++count;
owner.store(current);
assert(depth==0);
depth = 1;
return true;
}
return false;
}
void unlock(){
sched::thread *current = sched::thread::current();
// TODO: decide what to do (exception?) if unlock() is called when we're not the
// owner.
// It is fine to check if owner is this thread without race
// conditions, because no other thread can make it become this thread
// (or make it stop being this thread)
// Some special treatment for recursive mutex:
if (owner.load() != current) {
// TODO: Anything more sensible to do? exception?
debug("lockfree::mutex.unlock() of non-locked mutex");
return;
}
assert(depth!=0);
if (depth > 1) {
// It's fine to --depth without locking or atomicity, as only
// this thread can possibly change the depth.
--depth;
return;
}
// If we're here, depth=1 and we need to release the lock.
--depth;
if (depth > 0)
return; // recursive mutex still locked.
owner.store(nullptr);
depth = 0;
if (std::atomic_fetch_add(&count, -1) == 1) {
// No concurrent lock() until we got to count=0, so we can
// return now. This is the easy case :-)
// If there is no waiting lock(), we're done. This is the easy case :-)
if (std::atomic_fetch_add(&count, -1) == 1)
return;
}
// If we're still here, we noticed that there at least one concurrent
// lock(). It is possible it put itself on the waitqueue, and if so we
// need to wake it. It is also possible it didn't yet manage to put
// itself on the waitqueue in which case we'll tell it or another
// thread to take the mutex, using the "handoff" protocol.
// Each iteration of the handoff protocol is an attempt to communicate
// with an ongoing lock() to get it to take over the mutex. If more
// than one iteration is needed, we know that at least
// Otherwise there is at least one concurrent lock(). Awaken one if
// it's waiting on the waitqueue, otherwise use the RHO protocol to
// have the lock() responsible for waking someone up.
// TODO: it's not completely clear to me why more than two
// iterations can ever be needed. If the loop continues it means
// another lock() queued itself - but if it did, wouldn't the next
// iteration just pop and return?
while(true) {
// See discussion in lock() on why we can never have more than
// one concurrent pop() being called (as our waitqueue
// implementation assumes). It's important to also note that
// we can't have two pop() of two unlock() running at the
// same time (e.g., consider that while this code is stuck
// here, a lock() and unlock() will succeed on another processor).
// The reason is that we have a lock() running concurrently
// and it will not finish until being woken, and we won't do
// that before doing pop() below
sched::thread *other_thread = waitqueue.pop();
if (other_thread) {
// The thread we'll wake up will expect the mutex's owner to be
// set to it.
auto thread = waitqueue.pop();
if (thread) {
depth = 1;
owner.store(other_thread);
other_thread->wake();
owner.store(thread);
thread->wake();
return;
}
// Some concurrent lock() is in progress (we know this because of
// count) but it hasn't yet put itself on the wait queue.
sched::thread *ourhandoff = token++;
handoff = ourhandoff;
if (waitqueue.isempty()) {
// If the queue is empty, we know one concurrent lock()
// is in it code before adding itself to the queue, so it
// will later see the handoff flag we set here
// (see comment in lock()).
if (++sequence == 0U) ++sequence; // pick a number, but not 0
auto ourhandoff = sequence;
handoff.store(ourhandoff);
// If the queue is empty, the concurrent lock() is before adding
// itself, and therefore will definitely find our handoff later.
if (waitqueue.isempty())
return;
}
// Something already appeared on the queue, let's try to take
// the handoff ourselves.
if (!std::atomic_compare_exchange_strong(&handoff, &ourhandoff, (sched::thread *)nullptr)) {
// somebody else already took the handoff. That's fine - they will
// be resonposible for the mutex now.
// A thread already appeared on the queue, let's try to take the
// handoff ourselves and awaken it. If somebody else already took
// the handoff, great, we're done - they are responsible now.
if (!std::atomic_compare_exchange_strong(&handoff, &ourhandoff, 0U))
return;
}
}
}
};
......@@ -272,4 +212,5 @@ auto with_lock(Lock& lock, Func func) -> decltype(func())
return func();
}
}
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment