Skip to content
Snippets Groups Projects
Commit a621c261 authored by Nadav Har'El's avatar Nadav Har'El
Browse files

Started writing tests for the lock-free queue, and fixed a lot of

embarrassing errors ;-)
parent 2b8e9517
No related branches found
No related tags found
No related merge requests found
......@@ -12,24 +12,30 @@
using namespace std;
namespace lockfree {
// TODO: can we use boost::intrusive::list instead of our own?
template <class T>
class linked_item {
public:
T value;
linked_item<T> *next;
linked_item<T>(T val){
value = val;
next = nullptr;
}
};
linked_item<T>() : next(nullptr) { }
linked_item<T>(T val) : value(val), next(nullptr) { }
};
template <class T>
class queue_mpsc {
public:
typedef linked_item<T> LT;
private:
atomic<LT*> pushlist;
LT* poplist;
// currently, pop() when the queue is empty returns this "nothing".
// perhaps we should consider a different error mechanism.
T nothing;
public:
queue_mpsc<T>(T nothing) : pushlist(nullptr), poplist(nullptr), nothing(nothing) { }
inline void push(LT* item)
{
// We set up item to be the new head of the pushlist, pointing to the
......@@ -38,11 +44,11 @@ class queue_mpsc {
// Therefore we can only replace the head of the pushlist with a CAS,
// atomically checking that the head is still what we set in
// waiter->next, and changing the head.
// Note that while we have a loop here, it is lockfree - if we or
// the competing pusher are paused, the other one can make progress.
// Note that while we have a loop here, it is lockfree - if one
// competing pusher is paused, the other one can make progress.
do {
item->next = pushlist.load();
} while (std::atomic_compare_exchange_strong(pushlist, item->next, item));
item->next = pushlist;
} while (!std::atomic_compare_exchange_weak(&pushlist, &item->next, item));
}
inline T pop(void)
......@@ -67,9 +73,13 @@ class queue_mpsc {
while(r){
if(!r->next)
return r->value;
LT *next = r->next;
r->next = poplist;
poplist = r;
r = next;
}
// if we're still here, the queue is empty. return "nothing"
return nothing;
}
}
......@@ -79,3 +89,4 @@ class queue_mpsc {
}
};
}
......@@ -6,6 +6,7 @@
#include "tst-eventlist.hh"
#include "tst-rwlock.hh"
#include "tst-bsd-synch.hh"
#include "tst-queue-mpsc.hh"
using namespace unit_tests;
......@@ -17,6 +18,7 @@ void tests::execute_tests() {
test_eventlist evlist;
test_rwlock rwlock;
test_synch synch;
test_queue_mpsc q1;
instance().register_test(&threads);
instance().register_test(&malloc);
......@@ -25,6 +27,7 @@ void tests::execute_tests() {
instance().register_test(&evlist);
instance().register_test(&rwlock);
instance().register_test(&synch);
instance().register_test(&q1);
instance().run();
}
#ifndef __TST_QMPSC__
#define __TST_QMPSC__
#include "tst-hub.hh"
#include "sched.hh"
#include "debug.hh"
#include "lockfree/queue-mpsc.hh"
class test_queue_mpsc : public unit_tests::vtest {
public:
struct test_threads_data {
sched::thread* main;
sched::thread* t1;
bool t1ok;
sched::thread* t2;
bool t2ok;
int test_ctr;
};
void test_thread_1(test_threads_data& tt)
{
while (tt.test_ctr < 1000) {
sched::thread::wait_until([&] { return (tt.test_ctr % 2) == 0; });
++tt.test_ctr;
if (tt.t2ok) {
tt.t2->wake();
}
}
tt.t1ok = false;
tt.main->wake();
}
void test_thread_2(test_threads_data& tt)
{
while (tt.test_ctr < 1000) {
sched::thread::wait_until([&] { return (tt.test_ctr % 2) == 1; });
++tt.test_ctr;
if (tt.t1ok) {
tt.t1->wake();
}
}
tt.t2ok = false;
tt.main->wake();
}
void run()
{
debug("Running lockfree multi-producer single-consumer queue tests");
// Test trivial single-thread, queuing.
debug("test1");
lockfree::queue_mpsc<int> q(-1);
assert(q.pop()==-1);
assert(q.isempty());
debug("test2");
lockfree::linked_item<int> item(7);
q.push(&item);
assert(!q.isempty());
assert(q.pop()==7);
assert(q.pop()==-1);
assert(q.isempty());
debug("test3");
int len=10;
auto items = new lockfree::linked_item<int>[len];
for(int i=0; i<len; i++){
items[i].value = i*i;
q.push(&items[i]);
}
for(int i=0; i<len; i++){
assert(!q.isempty());
assert(q.pop()==i*i);
}
assert(q.pop()==-1);
assert(q.isempty());
delete[] items;
test_threads_data tt;
tt.main = sched::thread::current();
tt.t1ok = tt.t2ok = true;
tt.t1 = new sched::thread([&] { test_thread_1(tt); });
tt.t2 = new sched::thread([&] { test_thread_2(tt); });
tt.test_ctr = 0;
tt.t1->start();
tt.t2->start();
sched::thread::wait_until([&] { return tt.test_ctr >= 1000; });
tt.t1->join();
tt.t2->join();
delete tt.t1;
delete tt.t2;
debug("lockfree MPSC queue tests succeeded");
}
};
#endif
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment