From 3a6f95008df177f7cc78a32e9fe6e4cc558c1af1 Mon Sep 17 00:00:00 2001
From: Kaspar Schleiser <kaspar@schleiser.de>
Date: Wed, 30 Dec 2015 15:39:43 +0100
Subject: [PATCH] core: mbox: introduce thread decoupled message queues

---
 Makefile.pseudomodules |   1 +
 core/include/mbox.h    | 163 +++++++++++++++++++++++++++++++++++++++++
 core/include/thread.h  |  11 ++-
 core/mbox.c            | 126 +++++++++++++++++++++++++++++++
 4 files changed, 297 insertions(+), 4 deletions(-)
 create mode 100644 core/include/mbox.h
 create mode 100644 core/mbox.c

diff --git a/Makefile.pseudomodules b/Makefile.pseudomodules
index 702e233adc..7e0f5189ef 100644
--- a/Makefile.pseudomodules
+++ b/Makefile.pseudomodules
@@ -4,6 +4,7 @@ PSEUDOMODULES += conn_ip
 PSEUDOMODULES += conn_tcp
 PSEUDOMODULES += conn_udp
 PSEUDOMODULES += core_msg
+PSEUDOMODULES += core_mbox
 PSEUDOMODULES += core_thread_flags
 PSEUDOMODULES += emb6_router
 PSEUDOMODULES += gnrc_ipv6_default
diff --git a/core/include/mbox.h b/core/include/mbox.h
new file mode 100644
index 0000000000..4d5e9c5d0b
--- /dev/null
+++ b/core/include/mbox.h
@@ -0,0 +1,163 @@
+/*
+ * Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
+ *
+ * This file is subject to the terms and conditions of the GNU Lesser
+ * General Public License v2.1. See the file LICENSE in the top level
+ * directory for more details.
+ */
+
+/**
+ * @defgroup    core_mbox Mailboxes
+ * @ingroup     core
+ * @brief       Mailbox implementation
+ *
+ * @{
+ *
+ * @file
+ * @brief       Mailbox API
+ *
+ * @author      Kaspar Schleiser <kaspar@schleiser.de>
+ */
+
+#ifndef MBOX_H
+#define MBOX_H
+
+#include "list.h"
+#include "cib.h"
+#include "msg.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/** Static initializer for mbox objects */
+#define MBOX_INIT(queue, queue_size) {{0}, {0}, CIB_INIT(queue_size), queue}
+
+/**
+ * @brief Mailbox struct definition
+ */
+typedef struct {
+    list_node_t readers;    /**< list of threads waiting for message    */
+    list_node_t writers;    /**< list of threads waiting to send        */
+    cib_t cib;              /**< cib for msg array                      */
+    msg_t *msg_array;       /**< ptr to array of msg queue              */
+} mbox_t;
+
+enum {
+    NON_BLOCKING = 0,       /**< non-blocking mode */
+    BLOCKING,               /**< blocking mode */
+};
+
+/**
+ * @brief Initialize mbox object
+ *
+ * @note The message queue size must be a power of two!
+ *
+ * @param[in]   mbox        ptr to mailbox to initialize
+ * @param[in]   queue       array of msg_t used as queue
+ * @param[in]   queue_size  number of msg_t objects in queue
+ */
+static inline void mbox_init(mbox_t *mbox, msg_t *queue, unsigned int queue_size)
+{
+    mbox_t m = MBOX_INIT(queue, queue_size);
+    *mbox = m;
+}
+
+/**
+ * @brief Add message to mailbox
+ *
+ * If the mailbox is full, this fuction will return right away.
+ *
+ * @internal
+ *
+ * @param[in] mbox      ptr to mailbox to operate on
+ * @param[in] msg       ptr to message that will be copied into mailbox
+ * @param[in] blocking  block if 1, don't block if 0
+ *
+ * @return  1   if msg could be delivered
+ * @return  0   otherwise
+ */
+int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking);
+
+/**
+ * @brief Get message from mailbox
+ *
+ * If the mailbox is empty, this fuction will return right away.
+ *
+ * @internal
+ *
+ * @param[in] mbox  ptr to mailbox to operate on
+ * @param[in] msg   ptr to storage for retrieved message
+ * @param[in] blocking  block if 1, don't block if 0
+ *
+ * @return  1   if msg could be retrieved
+ * @return  0   otherwise
+ */
+int _mbox_get(mbox_t *mbox, msg_t *msg, int blocking);
+
+/**
+ * @brief Add message to mailbox
+ *
+ * If the mailbox is full, this fuction will block until space becomes
+ * available.
+ *
+ * @param[in] mbox  ptr to mailbox to operate on
+ * @param[in] msg   ptr to message that will be copied into mailbox
+ */
+static inline void mbox_put(mbox_t *mbox, msg_t *msg)
+{
+    _mbox_put(mbox, msg, BLOCKING);
+}
+
+/**
+ * @brief Add message to mailbox
+ *
+ * If the mailbox is full, this fuction will return right away.
+ *
+ * @param[in] mbox  ptr to mailbox to operate on
+ * @param[in] msg   ptr to message that will be copied into mailbox
+ *
+ * @return  1   if msg could be delivered
+ * @return  0   otherwise
+ */
+static inline int mbox_try_put(mbox_t *mbox, msg_t *msg)
+{
+    return _mbox_put(mbox, msg, NON_BLOCKING);
+}
+
+/**
+ * @brief Get message from mailbox
+ *
+ * If the mailbox is empty, this fuction will block until a message becomes
+ * available.
+ *
+ * @param[in] mbox  ptr to mailbox to operate on
+ * @param[in] msg   ptr to storage for retrieved message
+ */
+static inline void mbox_get(mbox_t *mbox, msg_t *msg)
+{
+    _mbox_get(mbox, msg, BLOCKING);
+}
+
+/**
+ * @brief Get message from mailbox
+ *
+ * If the mailbox is empty, this fuction will return right away.
+ *
+ * @param[in] mbox  ptr to mailbox to operate on
+ * @param[in] msg   ptr to storage for retrieved message
+ *
+ * @return  1   if msg could be retrieved
+ * @return  0   otherwise
+ */
+static inline int mbox_try_get(mbox_t *mbox, msg_t *msg)
+{
+    return _mbox_get(mbox, msg, NON_BLOCKING);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+/** @} */
+#endif /* MBOX_H */
diff --git a/core/include/thread.h b/core/include/thread.h
index 88828188b0..117215360d 100644
--- a/core/include/thread.h
+++ b/core/include/thread.h
@@ -56,6 +56,7 @@
 #define STATUS_REPLY_BLOCKED        5   /**< waiting for a message response     */
 #define STATUS_FLAG_BLOCKED_ANY     6   /**< waiting for any flag from flag_mask*/
 #define STATUS_FLAG_BLOCKED_ALL     7   /**< waiting for all flags in flag_mask */
+#define STATUS_MBOX_BLOCKED         8   /**< waiting for get/put on mbox        */
 /** @} */
 
 /**
@@ -63,8 +64,8 @@
  * @{*/
 #define STATUS_ON_RUNQUEUE      STATUS_RUNNING  /**< to check if on run queue:
                                                  `st >= STATUS_ON_RUNQUEUE`             */
-#define STATUS_RUNNING          8               /**< currently running                  */
-#define STATUS_PENDING          9               /**< waiting to be scheduled to run     */
+#define STATUS_RUNNING          9               /**< currently running                  */
+#define STATUS_PENDING         10               /**< waiting to be scheduled to run     */
 /** @} */
 /** @} */
 
@@ -84,8 +85,10 @@ struct _thread {
 
     clist_node_t rq_entry;          /**< run queue entry                */
 
-#if defined(MODULE_CORE_MSG) || defined(MODULE_CORE_THREAD_FLAGS)
-    void *wait_data;                /**< used by msg and thread flags   */
+#if defined(MODULE_CORE_MSG) || defined(MODULE_CORE_THREAD_FLAGS) \
+    || defined(MODULE_CORE_MBOX)
+    void *wait_data;                /**< used by msg, mbox and thread
+                                         flags                          */
 #endif
 #if defined(MODULE_CORE_MSG)
     list_node_t msg_waiters;        /**< threads waiting on message     */
diff --git a/core/mbox.c b/core/mbox.c
new file mode 100644
index 0000000000..7c14f35147
--- /dev/null
+++ b/core/mbox.c
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2016 Kaspar Schleiser <kaspar@schleiser.de>
+ *
+ * This file is subject to the terms and conditions of the GNU Lesser
+ * General Public License v2.1. See the file LICENSE in the top level
+ * directory for more details.
+ */
+
+/**
+ * @ingroup     core_mbox
+ * @{
+ *
+ * @file
+ * @brief       mailbox implementation
+ *
+ * @author      Kaspar Schleiser <kaspar@schleiser.de>
+ *
+ * @}
+ */
+
+#include <string.h>
+
+#include "mbox.h"
+#include "irq.h"
+#include "sched.h"
+#include "thread.h"
+
+#define ENABLE_DEBUG (0)
+#include "debug.h"
+
+#ifdef MODULE_CORE_MBOX
+
+static void _wake_waiter(thread_t *thread, unsigned irqstate)
+{
+    sched_set_status(thread, STATUS_PENDING);
+
+    DEBUG("mbox: Thread %"PRIkernel_pid": _wake_waiter(): waking up "
+            "%"PRIkernel_pid".\n", sched_active_pid, thread->pid);
+
+    uint16_t process_priority = thread->priority;
+    irq_restore(irqstate);
+    sched_switch(process_priority);
+}
+
+static void _wait(list_node_t *wait_list, unsigned irqstate)
+{
+    DEBUG("mbox: Thread %"PRIkernel_pid" _wait(): going blocked.\n",
+            sched_active_pid);
+
+    thread_t *me = (thread_t*) sched_active_thread;
+    sched_set_status(me, STATUS_MBOX_BLOCKED);
+    thread_add_to_list(wait_list, me);
+    irq_restore(irqstate);
+    thread_yield();
+
+    DEBUG("mbox: Thread %"PRIkernel_pid" _wait(): woke up.\n",
+            sched_active_pid);
+}
+
+int _mbox_put(mbox_t *mbox, msg_t *msg, int blocking)
+{
+    unsigned irqstate = irq_disable();
+
+    list_node_t *next = (list_node_t*) list_remove_head(&mbox->readers);
+    if (next) {
+        DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryput(): "
+                "there's a waiter.\n", sched_active_pid, (unsigned)mbox);
+        thread_t *thread = container_of((clist_node_t*)next, thread_t, rq_entry);
+        *(msg_t *)thread->wait_data = *msg;
+        _wake_waiter(thread, irqstate);
+        return 1;
+    }
+    else {
+        if (cib_full(&mbox->cib)) {
+            if (blocking) {
+                _wait(&mbox->writers, irqstate);
+                irqstate = irq_disable();
+            }
+            else {
+                irq_restore(irqstate);
+                return 0;
+            }
+        }
+
+        DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryput(): "
+                "queued message.\n", sched_active_pid, (unsigned)mbox);
+        msg->sender_pid = sched_active_pid;
+        /* copy msg into queue */
+        mbox->msg_array[cib_put_unsafe(&mbox->cib)] = *msg;
+        irq_restore(irqstate);
+        return 1;
+    }
+}
+
+int _mbox_get(mbox_t *mbox, msg_t *msg, int blocking)
+{
+    unsigned irqstate = irq_disable();
+
+    if (cib_avail(&mbox->cib)) {
+        DEBUG("mbox: Thread %"PRIkernel_pid" mbox 0x%08x: _tryget(): "
+                "got queued message.\n", sched_active_pid, (unsigned)mbox);
+        /* copy msg from queue */
+        *msg = mbox->msg_array[cib_get_unsafe(&mbox->cib)];
+        list_node_t *next = (list_node_t*) list_remove_head(&mbox->writers);
+        if (next) {
+            thread_t *thread = container_of((clist_node_t*)next, thread_t, rq_entry);
+            _wake_waiter(thread, irqstate);
+        }
+        else {
+            irq_restore(irqstate);
+        }
+        return 1;
+    }
+    else if (blocking) {
+        sched_active_thread->wait_data = (void*)msg;
+        _wait(&mbox->readers, irqstate);
+        /* sender has copied message */
+        return 1;
+    }
+    else {
+        irq_restore(irqstate);
+        return 0;
+    }
+}
+
+#endif /* MODULE_CORE_MBOX */
-- 
GitLab