From 122fc52c4a8075344adffa63c62da6d7f9108676 Mon Sep 17 00:00:00 2001 From: Nadav Har'El <nyh@cloudius-systems.com> Date: Tue, 12 Mar 2013 11:23:45 +0200 Subject: [PATCH] Clean up include/lockfree/mutex.hh, removing unhelpful comments (read the paper...) but leaving necessary discussion not present in the paper. And also fix various errors. --- include/lockfree/mutex.hh | 263 +++++++++++++++----------------------- 1 file changed, 102 insertions(+), 161 deletions(-) diff --git a/include/lockfree/mutex.hh b/include/lockfree/mutex.hh index 610b2ab69..0089fdbfe 100644 --- a/include/lockfree/mutex.hh +++ b/include/lockfree/mutex.hh @@ -1,33 +1,65 @@ #ifndef MUTEX_HH #define MUTEX_HH - -#include <atomic> - -#include <sched.hh> -#include <lockfree/queue-mpsc.hh> - // A lock-free mutex implementation, based on the combination of two basic // techniques: // 1. Our lock-free multi-producer single-consumer queue technique // (see lockfree/queue-mpsc.hh) -// 2. The "responsibility hand-off" protocol described in the 2007 paper +// 2. The "responsibility hand-off" (RHO) protocol described in the 2007 paper // "Blocking without Locking or LFTHREADS: A lock-free thread library" // by Anders Gidenstam and Marina Papatriantafilou. +// +// The operation and correctness of the RHO protocol is discussed in the +// aforementioned G&P 2007 paper, so we will avoid lengthy comments about it +// below, except where we differ from G&P. +// +// One especially important issue that we do need to justify is: +// Our lockfree queue implementation assumes that there cannot be two +// concurrent pop()s. We claim that this is true in the RHO protocol because: +// 1. We have pop() calls at two places: +// (A) In unlock(), after decrementing count and outside a handoff (=null) +// (B) in lock(), after picking up a handoff. +// 2. We can't have two threads at (A) at the same time, because one thread +// at (A) means another thread thread was just in lock() (because count>0), +// but currently running lock()s cannot complete (and get to unlock and A) +// until somebody will wake them it (and this is what we're trying to show +// is impossible), and news lock()s will likewise wait because the waiting +// lock() is keeping count>0. +// 3. While one lock() is at (B), we cannot have another thread at (A) or (B): +// This is because in (B) we only pop() after picking a handoff, so other +// lock()s cannot reach (B) (the did not pick the handoff, we did), and +// unlock cannot be at (A) because it only reaches (A) before making the +// handoff of after taking it back - and we know it didn't because we took +// the handoff. +// +// Another difference from our implementation from G&P is the content of the +// handoff token. G&P use the processor ID, but remark that it is not enough +// because of the ABA problem (it is possible while a CPU running lock() is +// paused, another one finishes unlock(), and then succeeds in another lock() +// and then comes a different unlock() with its unrelated handoff) and suggest +// to add a per-processor sequence number. Instead, we just used a per-mutex +// sequence number. As long as one CPU does not pause for a long enough +// duration for our (currently 32-bit) sequence number to wrap, we won't have +// a problem. A per-mutex sequence number is slower than a per-cpu one, but +// I doubt this will make a practical difference. + +#include <atomic> + +#include <sched.hh> +#include <lockfree/queue-mpsc.hh> + +namespace lockfree { class mutex { private: - // counts the number of threads holding the lock or waiting for it. std::atomic<int> count; + queue_mpsc<sched::thread *> waitqueue; + std::atomic<unsigned int> handoff; + unsigned int sequence; // "owner" and "depth" are need for implementing a recursive mutex - unsigned int depth; std::atomic<sched::thread *> owner; - //std::atomic_int active; // NYH try for resolving race condition - queue_mpsc<sched::thread *> waitqueue; - // responsiblity hand-off protocol (see comments below) - std::atomic<sched::thread *> handoff; - std::atomic<unsigned int> token; // TODO: are 4 bytes enough for the sensible duration of cpu pauses? + unsigned int depth; public: - mutex() : count(0), depth(0), owner(nullptr), handoff(nullptr), token(0) { } + mutex() : count(0), depth(0), owner(nullptr), handoff(nullptr), sequence(0) { } ~mutex() { assert(count==0); } void lock() @@ -47,88 +79,32 @@ public: // a recursive mutex so it's possible the lock holder is us - in which // case we don't need to increment depth instead of waiting. if (owner.load() == current) { - --count; // undo the increment of count (but still leaving it > 0) + --count; ++depth; return; } - // If we're here still here the lock is owned by a different thread, - // (or we're racing with another lock() which is likely to take the - // lock). Put this thread in a lock-free waiting queue, so it will - // eventually be woken when another thread releases the lock. - - // NOTE: the linked-list item "waiter" is allocated on the stack and - // then used on the wait queue's linked list, but we only exit this - // function (making waiter invalid) when we know that waiter was - // removed from the wait list. + // If we're here still here the lock is owned by a different thread. + // Put this thread in a waiting queue, so it will eventually be woken + // when another thread releases the lock. + sched::wait_guard wait_guard(current); // mark the thread "waiting" now. linked_item<sched::thread *> waiter(current); - - // Set current thread to "waiting" state before putting it in the wait - // queue, so that wake() is allowed on it. After doing prepare_wait() - // do not return from this function without calling current->wait()! - current->prepare_wait(); waitqueue.push(&waiter); - sched::thread *old_handoff = handoff.load(); - // In the "Responsibility Hand-Off protocol" (G&P, 2007) the unlock() - // offers a lock() the responsibility of of the mutex (namely, to wake - // some waiter up), in three steps (listed in increasing time): - // 1. unlock() offers the lock by setting "handoff" to some - // non-null token. - // 2. a lock() verifies that it has anybody to wake (the queue is - // not empty). Often it will be itself. - // 3. the lock() now atomically verifies that the same handoff is - // still in progress as it was during the queue check), and if - // it is takes the handoff (makes it null). + // The "Responsibility Hand-Off" protocol where a lock() picks from + // a concurrent unlock() the responsibility of waking somebody up: + auto old_handoff = handoff.load(); if (old_handoff) { - // TODO: clean up or remove this explanation - // "Handoff" protocol (inspired by Gidenstam & Papatriantafilou, 2007): - // A concurrent unlock() wants to wake up a thread waiting on this - // mutex, but couldn't find us because it checked the wait queue before - // we pushed ourselves. The unlocker can't busy-wait for a thread to - // appear on the wait queue as this can cause it to lock up if a lock() - // attempt is paused. Instead it flags a "handoff", and one of the - // waiting threads may pick up the handoff (by atomically zeroing - // it) and wake somebody (itself or some other waiter that turned up). - // - // They key point is that after signaling the handoff, the unlocker - // does not need to busy-wait - if it knows (from "count") there is - // a concurrent locker, but sees an empty waitqueue, then it knows - // the locker is before the waitqueue.push() call above, so it will - // surely see the handoff flag later. - if (!waitqueue.isempty()){ - // Typically waitqueue isn't empty because this thread is on - // it, but it is also possible that some past unlock() already - // found this thread on the wait list and woke it. - if (std::atomic_compare_exchange_strong(&handoff, &old_handoff, (sched::thread *)nullptr)) { - // We picked up the handoff, so now it is our duty to wake - // some waiting thread (could be this thread, or another - // thread). At this point (count>0, handoff=null) so the - // mutex is considered locked by us so if this cpu pauses now - // nothing bad happens except this thread keeping the lock. - // - // Above, we checked waitqueue was not empty, and - // since then nobody else had the opportunity to pop() it - // because: - // 1. unlock() only calls pop() outside the handoff attempt - // (while handoff=null) but the CAS above verified that - // the handoff attempt did not complete (if it did, the - // token would change). - // 2. another lock() only calls pop() after taking the - // handoff, and here we took the handoff (and there - // cannot be another unlock, and another handoff, - // until we wake some lock() attempt) - // The fact nobody else pop()ed is important for two - // reasons - 1. we know the queue is still not empty, - // and 2. the following pop() cannot be concurrent - // with another pop() (which our queue implementation - // does not support). - sched::thread *other_thread = waitqueue.pop(); - assert(other_thread); + if (!waitqueue.isempty()){ + if (std::atomic_compare_exchange_strong(&handoff, &old_handoff, 0U)) { + // Note the explanation above about no concurrent pop()s also + // explains why we can be sure waitqueue is still not empty. + auto thread = waitqueue.pop(); + assert(thread); assert(depth==0); depth = 1; - owner.store(other_thread); - other_thread->wake(); + owner.store(thread); + thread->wake(); } } } @@ -141,125 +117,89 @@ public: current->wait(); // reschedule // TODO: can spurious wakes happen? Maybe this loop isn't needed. } - // TODO: do we need to call current->stop_wait() or at least preempt_enabled?? } bool try_lock(){ sched::thread *current = sched::thread::current(); - int zero = 0; if (std::atomic_compare_exchange_strong(&count, &zero, 1)) { - // Uncontended case (no other thread is holding the lock, and no - // concurrent lock() attempts). We got the lock. + // Uncontended case. We got the lock. owner.store(current); assert(depth==0); depth = 1; return true; } - // If we're here the mutex was already locked, but we're implementing - // a recursive mutex so it's possible the lock holder is us - in which - // case we don't need to increment depth instead of waiting. + // We're implementing a recursive mutex -lock may still succeed if + // this thread is the one holding it. if (owner.load() == current) { - --count; // undo the increment of count (but still leaving it > 0) ++depth; return true; } - // If we're here still here the lock is owned by a different thread, - // or we're racing with another lock() which is likely to take the - // lock. We're almost ready to give up (return false), but the last - // chance is if we can accept a handoff - and if we do, we got the lock. - sched::thread *old_handoff = handoff.load(); - if(!old_handoff) - return false; - if (std::atomic_compare_exchange_strong(&handoff, &old_handoff, (sched::thread *)nullptr)) { - // we got the lock! + // The lock is taken, and we're almost ready to give up (return + // false), but the last chance is if we can accept a handoff - and if + // we do, we got the lock. + auto old_handoff = handoff.load(); + if(!old_handoff && + std::atomic_compare_exchange_strong(&handoff, &old_handoff, 0U)) { ++count; owner.store(current); assert(depth==0); depth = 1; return true; } - + return false; } + void unlock(){ sched::thread *current = sched::thread::current(); - // TODO: decide what to do (exception?) if unlock() is called when we're not the - // owner. - // It is fine to check if owner is this thread without race - // conditions, because no other thread can make it become this thread - // (or make it stop being this thread) + + // Some special treatment for recursive mutex: if (owner.load() != current) { + // TODO: Anything more sensible to do? exception? + debug("lockfree::mutex.unlock() of non-locked mutex"); return; } assert(depth!=0); - if (depth > 1) { - // It's fine to --depth without locking or atomicity, as only - // this thread can possibly change the depth. - --depth; - return; - } - - - // If we're here, depth=1 and we need to release the lock. + --depth; + if (depth > 0) + return; // recursive mutex still locked. owner.store(nullptr); - depth = 0; - if (std::atomic_fetch_add(&count, -1) == 1) { - // No concurrent lock() until we got to count=0, so we can - // return now. This is the easy case :-) + + // If there is no waiting lock(), we're done. This is the easy case :-) + if (std::atomic_fetch_add(&count, -1) == 1) return; - } - // If we're still here, we noticed that there at least one concurrent - // lock(). It is possible it put itself on the waitqueue, and if so we - // need to wake it. It is also possible it didn't yet manage to put - // itself on the waitqueue in which case we'll tell it or another - // thread to take the mutex, using the "handoff" protocol. - // Each iteration of the handoff protocol is an attempt to communicate - // with an ongoing lock() to get it to take over the mutex. If more - // than one iteration is needed, we know that at least + // Otherwise there is at least one concurrent lock(). Awaken one if + // it's waiting on the waitqueue, otherwise use the RHO protocol to + // have the lock() responsible for waking someone up. // TODO: it's not completely clear to me why more than two // iterations can ever be needed. If the loop continues it means // another lock() queued itself - but if it did, wouldn't the next // iteration just pop and return? while(true) { - // See discussion in lock() on why we can never have more than - // one concurrent pop() being called (as our waitqueue - // implementation assumes). It's important to also note that - // we can't have two pop() of two unlock() running at the - // same time (e.g., consider that while this code is stuck - // here, a lock() and unlock() will succeed on another processor). - // The reason is that we have a lock() running concurrently - // and it will not finish until being woken, and we won't do - // that before doing pop() below - sched::thread *other_thread = waitqueue.pop(); - if (other_thread) { - // The thread we'll wake up will expect the mutex's owner to be - // set to it. + auto thread = waitqueue.pop(); + if (thread) { depth = 1; - owner.store(other_thread); - other_thread->wake(); + owner.store(thread); + thread->wake(); return; } // Some concurrent lock() is in progress (we know this because of // count) but it hasn't yet put itself on the wait queue. - sched::thread *ourhandoff = token++; - handoff = ourhandoff; - if (waitqueue.isempty()) { - // If the queue is empty, we know one concurrent lock() - // is in it code before adding itself to the queue, so it - // will later see the handoff flag we set here - // (see comment in lock()). + if (++sequence == 0U) ++sequence; // pick a number, but not 0 + auto ourhandoff = sequence; + handoff.store(ourhandoff); + // If the queue is empty, the concurrent lock() is before adding + // itself, and therefore will definitely find our handoff later. + if (waitqueue.isempty()) return; - } - // Something already appeared on the queue, let's try to take - // the handoff ourselves. - if (!std::atomic_compare_exchange_strong(&handoff, &ourhandoff, (sched::thread *)nullptr)) { - // somebody else already took the handoff. That's fine - they will - // be resonposible for the mutex now. + // A thread already appeared on the queue, let's try to take the + // handoff ourselves and awaken it. If somebody else already took + // the handoff, great, we're done - they are responsible now. + if (!std::atomic_compare_exchange_strong(&handoff, &ourhandoff, 0U)) return; - } } } }; @@ -272,4 +212,5 @@ auto with_lock(Lock& lock, Func func) -> decltype(func()) return func(); } +} #endif -- GitLab