diff --git a/Makefile.dep b/Makefile.dep index f7391c92b51bc3b00a5c781c4afc76512e68488c..7455a419b709d98b958d35fcc7dd4fb5704f75e7 100644 --- a/Makefile.dep +++ b/Makefile.dep @@ -81,3 +81,7 @@ endif ifneq (,$(filter rgbled,$(USEMODULE))) USEMODULE += color endif + +ifneq (,$(filter pipe,$(USEMODULE))) + USEMODULE += lib +endif diff --git a/sys/Makefile b/sys/Makefile index 4160eaf57dff1902c75b1233a235940123948720..d7c973ac1b7f8651e401b67b2ebfa5639cf99269 100644 --- a/sys/Makefile +++ b/sys/Makefile @@ -94,5 +94,8 @@ endif ifneq (,$(filter color,$(USEMODULE))) DIRS += color endif +ifneq (,$(filter pipe,$(USEMODULE))) + DIRS += pipe +endif include $(RIOTBASE)/Makefile.base diff --git a/sys/include/pipe.h b/sys/include/pipe.h new file mode 100644 index 0000000000000000000000000000000000000000..8f53e1eb16945107003b59fc9998b9e90288821e --- /dev/null +++ b/sys/include/pipe.h @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2014 RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @addtogroup sys + * @{ + * @file + * + * @brief Generic pipe implementation. + * @details This pipe implementation is a tight wrapper around a ringbuffer. + * It sends the calling thread to sleep if the ringbuffer is full + * or empty, respectively. It can be used in ISRs, too. + * + * @author RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + */ + +#ifndef __PIPE__H +#define __PIPE__H + +#include <sys/types.h> + +#include "mutex.h" +#include "ringbuffer.h" +#include "thread.h" + +#ifndef PIPE_BUF +# define PIPE_BUF (128) /**< Size of a dynamically malloc'd pipe. */ +#endif + +/** + * A generic pipe. + */ +typedef struct riot_pipe +{ + ringbuffer_t *rb; /**< Wrapped ringbuffer. */ + tcb_t *read_blocked; /**< A thread that wants to write to this full pipe. */ + tcb_t *write_blocked; /**< A thread that wants to read from this empty pipe. */ + void (*free)(void *); /**< Function to call by pipe_free(). Used like `pipe->free(pipe)`. */ +} pipe_t; + +/** + * @brief Initialize a pipe. + * @param[out] pipe Datum to initialize. + * @param rb Ringbuffer to use. Needs to be initialized! + * @param free Function to call by pipe_free(). Used like `pipe->free(pipe)`. + * Should be `NULL` for statically allocated pipes. + */ +void pipe_init(pipe_t *pipe, ringbuffer_t *rb, void (*free)(void *)); + +/** + * @brief Read from a pipe. + * @details Only one thread may access the pipe readingly at once. + * If the pipe is empty, then the current thread is send sleeping. + * It gets woken up once there is data ready in the pipe. + * In an ISR (inISR()) 0 will returned if the pipe is empty. + * @param[in] pipe Pipe to read from. + * @param[out] buf Buffer to write into + * @param n Size of buffer. + * @returns `> 0` if data could be read. + * `== 0` if the pipe is empty and isISR(). + */ +ssize_t pipe_read(pipe_t *pipe, void *buf, size_t n); + +/** + * @brief Write to a pipe. + * @details Only one thread may access the pipe writingly at once. + * If the pipe is full, then the current thread is send sleeping. + * It gets woken up once there is room again in the pipe. + * In an ISR (inISR()) 0 will returned if the pipe is full. + * @param[in] pipe Pipe to write to. + * @param[out] buf Buffer to read from. + * @param n Size of buffer. + * @returns `> 0` if data could be written. + * `== 0` if the pipe is full and isISR(). + */ +ssize_t pipe_write(pipe_t *pipe, const void *buf, size_t n); + +/** + * @brief Dynamically allocate a pipe with room for `size` bytes. + * @details This function uses `malloc()` and may break real-time behaviors. + * Try not to use this function. + * @param size Size of the underlying ringbuffer to allocate. + * @returns Newly allocated pipe. NULL if the memory is exhausted. + */ +pipe_t *pipe_malloc(unsigned size); + +/** + * @brief Free a pipe. + * @details On statically allocated pipes you do not have to call this function. + * Most likely you will only need this function in junction with + * pipe_malloc(). + * @param rp Pipe to free. + */ +void pipe_free(pipe_t *rp); + +#endif +/** + * @} + */ diff --git a/sys/pipe/Makefile b/sys/pipe/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..98382ad27feb4688e68edd52cc37307dba090586 --- /dev/null +++ b/sys/pipe/Makefile @@ -0,0 +1,3 @@ +MODULE = pipe + +include $(RIOTBASE)/Makefile.base diff --git a/sys/pipe/pipe.c b/sys/pipe/pipe.c new file mode 100644 index 0000000000000000000000000000000000000000..c1b2bcd97c933d876d8c8a2abbab0ec190812a7a --- /dev/null +++ b/sys/pipe/pipe.c @@ -0,0 +1,101 @@ +/* + * Copyright (C) 2014 RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @ingroup sys + * @{ + * @file + * @brief Implementation for statically allocated pipes. + * @author RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * @} + */ + +#include "irq.h" +#include "pipe.h" +#include "sched.h" + +typedef unsigned (*ringbuffer_op_t)(ringbuffer_t *restrict rb, char *buf, unsigned n); + +static ssize_t pipe_rw(ringbuffer_t *rb, + void *buf, + size_t n, + tcb_t **other_op_blocked, + tcb_t **this_op_blocked, + ringbuffer_op_t ringbuffer_op) +{ + if (n == 0) { + return 0; + } + + while (1) { + unsigned old_state = disableIRQ(); + + unsigned count = ringbuffer_op(rb, buf, n); + + if (count > 0) { + tcb_t *other_thread = *other_op_blocked; + int other_prio = -1; + if (other_thread) { + *other_op_blocked = NULL; + other_prio = other_thread->priority; + sched_set_status(other_thread, STATUS_PENDING); + } + + restoreIRQ(old_state); + + if (other_prio >= 0) { + sched_switch(other_prio); + } + + return count; + } + else if (*this_op_blocked || inISR()) { + restoreIRQ(old_state); + return 0; + } + else { + *this_op_blocked = (tcb_t *) sched_active_thread; + + sched_set_status((tcb_t *) sched_active_thread, STATUS_SLEEPING); + restoreIRQ(old_state); + thread_yield(); + } + } +} + +ssize_t pipe_read(pipe_t *pipe, void *buf, size_t n) +{ + return pipe_rw(pipe->rb, (char *) buf, n, + &pipe->write_blocked, &pipe->read_blocked, ringbuffer_get); +} + +ssize_t pipe_write(pipe_t *pipe, const void *buf, size_t n) +{ + return pipe_rw(pipe->rb, (char *) buf, n, + &pipe->read_blocked, &pipe->write_blocked, (ringbuffer_op_t) ringbuffer_add); +} + +void pipe_init(pipe_t *pipe, ringbuffer_t *rb, void (*free)(void *)) +{ + *pipe = (pipe_t) { + .rb = rb, + .read_blocked = NULL, + .write_blocked = NULL, + .free = free, + }; +} diff --git a/sys/pipe/pipe_dynamic.c b/sys/pipe/pipe_dynamic.c new file mode 100644 index 0000000000000000000000000000000000000000..1cb098382b24d9b6cc00c533617bebedfb7fd0f4 --- /dev/null +++ b/sys/pipe/pipe_dynamic.c @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2014 RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @ingroup sys + * @{ + * @file + * @brief Implementation for dynamically allocated pipes. + * @author RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * @} + */ + +#include <malloc.h> + +#include "pipe.h" + +struct mallocd_pipe +{ + pipe_t pipe; + ringbuffer_t rb; + char buffer[1]; +}; + +pipe_t *pipe_malloc(unsigned size) +{ + struct mallocd_pipe *m_pipe = malloc(sizeof (*m_pipe) + size); + if (m_pipe) { + ringbuffer_init(&m_pipe->rb, m_pipe->buffer, size); + pipe_init(&m_pipe->pipe, &m_pipe->rb, free); + } + return &m_pipe->pipe; +} + +void pipe_free(pipe_t *rp) +{ + if (rp && rp->free) { + rp->free(rp); + } +} diff --git a/tests/test_pipe/Makefile b/tests/test_pipe/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..116521e3c00945182eb991f7127ab57ce10ebd1f --- /dev/null +++ b/tests/test_pipe/Makefile @@ -0,0 +1,8 @@ +APPLICATION = test_pipe +include ../Makefile.tests_common + +BOARD_INSUFFICIENT_RAM := stm32f0discovery + +USEMODULE += pipe + +include $(RIOTBASE)/Makefile.include diff --git a/tests/test_pipe/main.c b/tests/test_pipe/main.c new file mode 100644 index 0000000000000000000000000000000000000000..401e7f6ae9b4d369791ac274b0039e93ce158c4f --- /dev/null +++ b/tests/test_pipe/main.c @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2014 RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +/** + * @ingroup tests + * @{ + * @file + * @brief Test for module pipe. + * @author RenĂŠ Kijewski <rene.kijewski@fu-berlin.de> + * @} + */ + +#include <stdio.h> +#include <inttypes.h> + +#include "thread.h" +#include "flags.h" +#include "kernel.h" +#include "pipe.h" +#include "pipe.h" + +#define BYTES_TOTAL (26) + +static char stacks[2][KERNEL_CONF_STACKSIZE_MAIN]; + +static char pipe_bufs[2][6]; +static ringbuffer_t rbs[2]; + +static pipe_t pipes[2]; + +static void *run_middle(void *arg) +{ + (void) arg; + + unsigned read_total = 0; + while (read_total < BYTES_TOTAL) { + char buf[4]; + unsigned read = pipe_read(&pipes[0], buf, sizeof (buf)); + unsigned read_start = read_total; + read_total += read; + printf("Middle read: <%.*s> [%u:%u]\n", read, buf, + read_start, read_total); + + unsigned written_total = 0; + while (written_total < read) { + int written = pipe_write(&pipes[1], &buf[written_total], + read - written_total); + written_total += written; + } + } + + puts("Middle done."); + return NULL; +} + +static void *run_end(void *arg) +{ + (void) arg; + + unsigned read_total = 0; + while (read_total < BYTES_TOTAL) { + char buf[3]; + int read = pipe_read(&pipes[1], buf, sizeof (buf)); + unsigned read_start = read_total; + read_total += read; + printf("End read: <%.*s> [%u:%u]\n", read, buf, + read_start, read_total); + } + + puts("End done."); + return NULL; +} + +static unsigned min(unsigned a, unsigned b) +{ + return a < b ? a : b; +} + +int main(void) +{ + puts("Start."); + + for (int i = 0; i < 2; ++i) { + ringbuffer_init(&rbs[i], pipe_bufs[i], sizeof (pipe_bufs[i])); + pipe_init(&pipes[i], &rbs[i], NULL); + } + + thread_create(stacks[0], sizeof (stacks[0]), + PRIORITY_MAIN, CREATE_WOUT_YIELD | CREATE_STACKTEST, + run_middle, NULL, "middle"); + thread_create(stacks[1], sizeof (stacks[1]), + PRIORITY_MAIN, CREATE_WOUT_YIELD | CREATE_STACKTEST, + run_end, NULL, "end"); + + unsigned total = 0; + while (total < BYTES_TOTAL) { + char buf[5]; + unsigned bytes_cur = min(BYTES_TOTAL - total, sizeof (buf)); + for (unsigned i = 0; i < bytes_cur; ++i) { + buf[i] = 'A' + total; + ++total; + } + + unsigned written_total = 0; + while (written_total < bytes_cur) { + int written = pipe_write(&pipes[0], &buf[written_total], + bytes_cur - written_total); + written_total += written; + } + } + + puts("Main done."); + return 0; +}