From fb8edbb610f986d4a7a5616a8efc06f3e279935a Mon Sep 17 00:00:00 2001
From: Sam Kumar <samkumar99@gmail.com>
Date: Sun, 7 Jan 2018 20:39:52 -0800
Subject: [PATCH] core: condition variable implementation

---
 core/cond.c                      |  87 +++++++++++++
 core/include/cond.h              | 214 +++++++++++++++++++++++++++++++
 core/include/thread.h            |   5 +-
 tests/cond_order/Makefile        |  11 ++
 tests/cond_order/README.md       |  35 +++++
 tests/cond_order/main.c          |  97 ++++++++++++++
 tests/cond_order/tests/01-run.py |  44 +++++++
 7 files changed, 491 insertions(+), 2 deletions(-)
 create mode 100644 core/cond.c
 create mode 100644 core/include/cond.h
 create mode 100644 tests/cond_order/Makefile
 create mode 100644 tests/cond_order/README.md
 create mode 100644 tests/cond_order/main.c
 create mode 100755 tests/cond_order/tests/01-run.py

diff --git a/core/cond.c b/core/cond.c
new file mode 100644
index 0000000000..9a3a41ceeb
--- /dev/null
+++ b/core/cond.c
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2016 Sam Kumar <samkumar@berkeley.edu>
+ *               2016 University of California, Berkeley
+ *
+ * This file is subject to the terms and conditions of the GNU Lesser
+ * General Public License v2.1. See the file LICENSE in the top level
+ * directory for more details.
+ */
+
+/**
+ * @ingroup     core_sync
+ * @{
+ *
+ * @file
+ * @brief       Kernel condition variable implementation
+ *
+ * @author      Sam Kumar <samkumar@berkeley.edu>
+ *
+ * @}
+ */
+
+#include "cond.h"
+#include "irq.h"
+#include "mutex.h"
+#include "thread.h"
+
+#define ENABLE_DEBUG    (0)
+#include "debug.h"
+
+void cond_init(cond_t *cond)
+{
+    cond->queue.next = NULL;
+}
+
+void cond_wait(cond_t *cond, mutex_t *mutex)
+{
+    unsigned irqstate = irq_disable();
+    thread_t *me = (thread_t *)sched_active_thread;
+
+    mutex_unlock(mutex);
+    sched_set_status(me, STATUS_COND_BLOCKED);
+    thread_add_to_list(&cond->queue, me);
+    irq_restore(irqstate);
+    thread_yield_higher();
+
+    /*
+     * Once we reach this point, the condition variable was signalled,
+     * and we are free to continue.
+     */
+    mutex_lock(mutex);
+}
+
+static void _cond_signal(cond_t *cond, bool broadcast)
+{
+    unsigned irqstate = irq_disable();
+    list_node_t *next;
+
+    uint16_t min_prio = THREAD_PRIORITY_MIN + 1;
+
+    while ((next = list_remove_head(&cond->queue)) != NULL) {
+        thread_t *process = container_of((clist_node_t *)next, thread_t, rq_entry);
+        sched_set_status(process, STATUS_PENDING);
+        uint16_t process_priority = process->priority;
+        if (process_priority < min_prio) {
+            min_prio = process_priority;
+        }
+
+        if (!broadcast) {
+            break;
+        }
+    }
+
+    irq_restore(irqstate);
+    if (min_prio <= THREAD_PRIORITY_MIN) {
+        sched_switch(min_prio);
+    }
+}
+
+void cond_signal(cond_t *cond)
+{
+    _cond_signal(cond, false);
+}
+
+void cond_broadcast(cond_t *cond)
+{
+    _cond_signal(cond, true);
+}
diff --git a/core/include/cond.h b/core/include/cond.h
new file mode 100644
index 0000000000..38c14677a2
--- /dev/null
+++ b/core/include/cond.h
@@ -0,0 +1,214 @@
+/*
+ * This file is subject to the terms and conditions of the GNU Lesser
+ * General Public License v2.1. See the file LICENSE in the top level
+ * directory for more details.
+ */
+
+/**
+ * @brief       Condition variable for thread synchronization
+ * @ingroup     core, core_sync
+ * @{
+ *
+ * @file
+ * @brief       RIOT synchronization API
+ *
+ * This file contains a condition variable with Mesa-style semantics.
+ *
+ * Condition variable solve the following problem. Suppose that a thread should
+ * sleep until a certain condition comes true. Condition variables provide a
+ * primitive whereby a thread can go to sleep by calling cond_wait(). Then,
+ * when the condition comes true in a thread or interrupt context, cond_signal()
+ * can be called, to wake up the thread.
+ *
+ * "Mesa-style semantics" means that, when cond_signal() is called, the
+ * sleeping thread becomes runnable, but may not be scheduled immediately. In
+ * contrast, "Hoare-style semantics" means that when cond_signal() is called,
+ * the sleeping thread is awakened and immediately scheduled. The condition
+ * variable in this file implements Mesa-style semantics, as is used by other
+ * standard implementations, such as pthreads.
+ *
+ * To avoid races, condition variables are used with mutexes. When a thread is
+ * put to sleep with cond_wait, it atomically unlocks the provided mutex and
+ * then goes to sleep. When it is awakened with cond_signal, it reacquires the
+ * mutex.
+ *
+ * As a rule of thumb, every condition variable should have a corresponding
+ * mutex, and that mutex should be held whenever performing any operation with
+ * the condition variable. There are exceptions to this rule, where it is
+ * appropriate to call cond_signal or cond_broadcast without the mutex held
+ * (for example, if you know that no thread will call cond_wait concurrently).
+ * It is safe to call cond_signal or cond_broadcast in interrupt context.
+ *
+ * However, the programmer should be aware of the following situation that
+ * could arise with Mesa-style condition variables: the condition may become
+ * true, making the sleeping thread runnable, but the condition may become
+ * false again before the thread is scheduled. To handle this case, the
+ * condition variable should be used in a while loop as follows:
+ *
+ * ```
+ * mutex_lock(&lock);
+ * while (condition_is_not_true) {
+ *     cond_wait(&cond, &lock);
+ * }
+ * // do work while condition is true.
+ * mutex_unlock(&lock);
+ * ```
+ *
+ * When used in this way, the thread checks, once it has has awakened, whether
+ * the condition is actually true, and goes to sleep again if it is not. This
+ * is the standard way to use Mesa-style condition variables.
+ *
+ * Example: Suppose we want to implement a bounded queue, such as a Unix-style
+ * pipe between two threads. When data is written to the pipe, it is appended
+ * to a queue, and the writing thread blocks if the queue is full. When data
+ * is read from the pipe, it is removed from the queue; if the queue is empty,
+ * the reading thread blocks until it is not empty. If the pipe is closed by
+ * the sender, waiting reading threads wake up.
+ *
+ * Here is a sketch of how to implement such a structure with condition
+ * variables. For simplicity, messages are single bytes. We assume a FIFO data
+ * structure queue_t. We assume it is unsafe to add to the queue if it is full,
+ * or remove from the queue if it is empty.
+ *
+ * ```
+ * typedef struct pipe {
+ *     queue_t queue;
+ *     cond_t read_cond;
+ *     cond_t write_cond;
+ *     mutex_t lock;
+ *     bool closed;
+ * } pipe_t;
+ *
+ * void pipe_init(pipe_t* pipe) {
+ *     queue_init(&pipe->queue);
+ *     cond_init(&pipe->read_cond);
+ *     cond_init(&pipe->write_cond);
+ *     mutex_init(&pipe->lock);
+ *     pipe->closed = false;
+ * }
+ *
+ * void pipe_write(pipe_t* pipe, char c) {
+ *     mutex_lock(&pipe->lock);
+ *     while (queue_length(&pipe->queue) == MAX_QUEUE_LENGTH && !pipe->closed) {
+ *         cond_wait(&pipe->write_cond, &pipe->lock);
+ *     }
+ *     if (pipe->closed) {
+ *         mutex_unlock(&pipe->lock);
+ *         return 0;
+ *     }
+ *     add_to_queue(&pipe->queue, c);
+ *     cond_signal(&pipe->read_cond);
+ *     mutex_unlock(&pipe->lock);
+ *     return 1;
+ * }
+ *
+ * void pipe_close(pipe_t* pipe) {
+ *     mutex_lock(&pipe->lock);
+ *     pipe->closed = true;
+ *     cond_broadcast(&pipe->read_cond);
+ *     cond_broadcast(&pipe->write_cond);
+ *     mutex_unlock(&pipe->lock);
+ * }
+ *
+ * int pipe_read(pipe_t* pipe, char* buf) {
+ *     mutex_lock(&pipe->lock);
+ *     while (queue_length(&pipe->queue) == 0 && !pipe->closed) {
+ *         cond_wait(&pipe->read_cond, &pipe->lock);
+ *     }
+ *     if (pipe->closed) {
+ *         mutex_unlock(&pipe->lock);
+ *         return 0;
+ *     }
+ *     *buf = remove_from_queue(&pipe->queue);
+ *     cond_signal(&pipe->write_cond);
+ *     mutex_unlock(&pipe->lock);
+ *     return 1;
+ * }
+ * ```
+ *
+ * Note that this could actually be written with a single condition variable.
+ * However, the example includes two for didactic reasons.
+ *
+ * @author      Sam Kumar <samkumar@berkeley.edu>
+ */
+
+#ifndef COND_H
+#define COND_H
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#include "list.h"
+#include "mutex.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @brief Condition variable structure. Must never be modified by the user.
+ */
+typedef struct {
+    /**
+     * @brief   The process waiting queue of the condition variable.
+     *
+     * @internal
+     */
+    list_node_t queue;
+} cond_t;
+
+/**
+ * @brief   Static initializer for cond_t.
+ *
+ * @note This initializer is preferable to cond_init().
+ */
+#define COND_INIT { { NULL } }
+
+/**
+ * @brief Initializes a condition variable.
+ *
+ * @details For initialization of variables use COND_INIT instead.
+ *          Only use the function call for dynamically allocated condition
+ *          variables.
+ *
+ * @param[in] cond    Pre-allocated condition structure. Must not be NULL.
+ */
+void cond_init(cond_t *cond);
+
+/**
+ * @brief Waits on a condition.
+ *
+ * @param[in] cond          Condition variable to wait on.
+ * @param[in] mutex         Mutex object held by the current thread.
+ */
+void cond_wait(cond_t *cond, mutex_t *mutex);
+
+
+/**
+ * @brief Wakes up one thread waiting on the condition variable.
+ *
+ * @details The thread is marked as runnable and will only be scheduled later
+ * at the scheduler's whim, so the thread should re-check the condition and wait
+ * again if it is not fulfilled.
+ *
+ * @param[in] cond  Condition variable to signal.
+ */
+void cond_signal(cond_t *cond);
+
+/**
+ * @brief Wakes up all threads waiting on the condition variable.
+ *
+ * @details The threads are marked as runnable and will only be scheduled later
+ * at the scheduler's whim, so they should re-check the condition and wait again
+ * if it is not fulfilled.
+ *
+ * @param[in] cond  Condition variable to broadcast.
+ */
+void cond_broadcast(cond_t *cond);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* COND_H */
+/** @} */
diff --git a/core/include/thread.h b/core/include/thread.h
index 7e9294f4dd..dc623e2de5 100644
--- a/core/include/thread.h
+++ b/core/include/thread.h
@@ -154,6 +154,7 @@
 #define STATUS_FLAG_BLOCKED_ANY     6   /**< waiting for any flag from flag_mask*/
 #define STATUS_FLAG_BLOCKED_ALL     7   /**< waiting for all flags in flag_mask */
 #define STATUS_MBOX_BLOCKED         8   /**< waiting for get/put on mbox        */
+#define STATUS_COND_BLOCKED         9   /**< waiting for a condition variable   */
 /** @} */
 
 /**
@@ -162,8 +163,8 @@
  */
 #define STATUS_ON_RUNQUEUE      STATUS_RUNNING  /**< to check if on run queue:
                                                  `st >= STATUS_ON_RUNQUEUE`             */
-#define STATUS_RUNNING          9               /**< currently running                  */
-#define STATUS_PENDING         10               /**< waiting to be scheduled to run     */
+#define STATUS_RUNNING         10               /**< currently running                  */
+#define STATUS_PENDING         11               /**< waiting to be scheduled to run     */
 /** @} */
 
 /**
diff --git a/tests/cond_order/Makefile b/tests/cond_order/Makefile
new file mode 100644
index 0000000000..cb1d0679dc
--- /dev/null
+++ b/tests/cond_order/Makefile
@@ -0,0 +1,11 @@
+include ../Makefile.tests_common
+
+BOARD_INSUFFICIENT_MEMORY := nucleo32-f031 nucleo32-f042 nucleo32-l031 nucleo-f030 \
+                             nucleo-l053 stm32f0discovery arduino-duemilanove \
+							 arduino-uno nucleo-f030r8 nucleo-f031k6 nucleo-f042k6 \
+							 nucleo-l031k6 nucleo-l053r8
+
+include $(RIOTBASE)/Makefile.include
+
+test:
+	tests/01-run.py
diff --git a/tests/cond_order/README.md b/tests/cond_order/README.md
new file mode 100644
index 0000000000..227319c9c2
--- /dev/null
+++ b/tests/cond_order/README.md
@@ -0,0 +1,35 @@
+Expected result
+===============
+When successful, you should see 5 different threads printing their PID and
+priority. The thread with the lowest priority should be able to signaled first,
+followed by the other threads in the order of their priority (highest next). If
+the main thread holds the lock, however, none of the other threads should be
+able to make progress. The output should look like the following:
+
+```
+main(): This is RIOT! (Version: 2018.01-devel-1120-g811de-starbeam-feature-condition-variable)
+Condition variable order test
+Please refer to the README.md for more information
+
+T3 (prio 6): waiting on condition variable now
+T4 (prio 4): waiting on condition variable now
+T5 (prio 0): waiting on condition variable now
+T6 (prio 2): waiting on condition variable now
+T7 (prio 1): waiting on condition variable now
+First batch was signaled
+T5 (prio 0): condition variable was signaled now
+T7 (prio 1): condition variable was signaled now
+T6 (prio 2): condition variable was signaled now
+First batch has woken up
+Second batch was signaled
+T4 (prio 4): condition variable was signaled now
+T3 (prio 6): condition variable was signaled now
+Second batch has woken up
+
+Test END, check the order of priorities above.
+```
+
+Background
+==========
+This test application stresses a condition variable with a number of threads
+waiting on it. The threads are signaled (awakened) in two batches.
diff --git a/tests/cond_order/main.c b/tests/cond_order/main.c
new file mode 100644
index 0000000000..249c776af9
--- /dev/null
+++ b/tests/cond_order/main.c
@@ -0,0 +1,97 @@
+/*
+ * Copyright (C) 2018 University of California, Berkeley
+ * Copyright (C) 2016 Freie Universität Berlin
+ *
+ * This file is subject to the terms and conditions of the GNU Lesser
+ * General Public License v2.1. See the file LICENSE in the top level
+ * directory for more details.
+ */
+
+/**
+ * @ingroup     tests
+ * @{
+ *
+ * @file
+ * @brief       Test application for testing mutexes
+ *
+ * @author      Sam Kumar <samkumar@cs.berkeley.edu>,
+ *              Hauke Petersen <hauke.petersen@fu-berlin.de>
+ * @}
+ */
+
+#include <stdio.h>
+
+#include "cond.h"
+#include "mutex.h"
+#include "thread.h"
+
+#define THREAD_NUMOF            (5U)
+#define THREAD_FIRSTGROUP_NUMOF (3U)
+
+extern volatile thread_t *sched_active_thread;
+
+static char stacks[THREAD_NUMOF][THREAD_STACKSIZE_MAIN];
+
+static const char prios[THREAD_NUMOF] = {THREAD_PRIORITY_MAIN - 1, 4, 0, 2, 1};
+
+static mutex_t testlock;
+static cond_t testcond;
+
+static void *lockme(void *arg)
+{
+    (void)arg;
+    volatile thread_t *t = sched_active_thread;
+
+    mutex_lock(&testlock);
+    printf("T%i (prio %i): waiting on condition variable now\n",
+           (int)t->pid, (int)t->priority);
+    cond_wait(&testcond, &testlock);
+    printf("T%i (prio %i): condition variable was signaled now\n",
+           (int)t->pid, (int)t->priority);
+    mutex_unlock(&testlock);
+
+    thread_yield();
+
+    mutex_unlock(&testlock);
+
+    return NULL;
+}
+
+int main(void)
+{
+    puts("Condition variable order test");
+    puts("Please refer to the README.md for more information\n");
+
+    mutex_init(&testlock);
+    cond_init(&testcond);
+
+    /* create threads */
+    for (unsigned i = 0; i < THREAD_NUMOF; i++) {
+        thread_create(stacks[i], sizeof(stacks[i]), prios[i], 0,
+                      lockme, NULL, "t");
+    }
+    /* allow threads to lock the mutex and wait on the condition variable */
+
+    /* signal the first few threads, in a group */
+    mutex_lock(&testlock);
+    for (unsigned i = 0; i < THREAD_FIRSTGROUP_NUMOF; i++) {
+        cond_signal(&testcond);
+    }
+    printf("First batch was signaled\n");
+    mutex_unlock(&testlock);
+    /* allow the first THREAD_FIRSTGROUP_NUMOF threads to wake up */
+
+    printf("First batch has woken up\n");
+
+    mutex_lock(&testlock);
+    cond_broadcast(&testcond);
+    printf("Second batch was signaled\n");
+    mutex_unlock(&testlock);
+    /* allow the remaining threads to wake up */
+    printf("Second batch has woken up\n");
+
+    mutex_lock(&testlock);
+    puts("\nTest END, check the order of priorities above.");
+
+    return 0;
+}
diff --git a/tests/cond_order/tests/01-run.py b/tests/cond_order/tests/01-run.py
new file mode 100755
index 0000000000..106a9cf9f2
--- /dev/null
+++ b/tests/cond_order/tests/01-run.py
@@ -0,0 +1,44 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
+# Copyright (C) 2016 Oliver Hahm <oliver.hahm@inria.fr>
+#
+# This file is subject to the terms and conditions of the GNU Lesser
+# General Public License v2.1. See the file LICENSE in the top level
+# directory for more details.
+
+import os
+import sys
+
+thread_prio = {
+        3:  6,
+        4:  4,
+        5:  0,
+        6:  2,
+        7:  1
+        }
+first_group_size = 3
+
+
+def testfunc(child):
+    for k in thread_prio.keys():
+        child.expect(u"T%i \(prio %i\): waiting on condition variable now" % (k, thread_prio[k]))
+
+    count = 0
+    last = -1
+    child.expect(u"First batch was signaled")
+    for _ in range(len(thread_prio)):
+        child.expect(u"T\d+ \(prio (\d+)\): condition variable was signaled now")
+        assert(int(child.match.group(1)) > last)
+        last = int(child.match.group(1))
+        count += 1
+        if count == 3:
+            child.expect(u"First batch has woken up")
+            child.expect(u"Second batch was signaled")
+    child.expect(u"Second batch has woken up")
+
+
+if __name__ == "__main__":
+    sys.path.append(os.path.join(os.environ['RIOTBASE'], 'dist/tools/testrunner'))
+    from testrunner import run
+    sys.exit(run(testfunc))
-- 
GitLab