From bb71986ecf20bee4a0276e6f0d3047ba24dfa869 Mon Sep 17 00:00:00 2001 From: Hauke Petersen <hauke.petersen@fu-berlin.de> Date: Tue, 21 Feb 2017 00:01:59 +0100 Subject: [PATCH] net: added emCute - introducing MQTT-SN support --- Makefile.dep | 6 + sys/Makefile | 3 + sys/include/net/emcute.h | 381 ++++++++++++ sys/net/application_layer/emcute/Makefile | 1 + sys/net/application_layer/emcute/emcute.c | 568 ++++++++++++++++++ .../emcute/emcute_internal.h | 74 +++ sys/net/application_layer/emcute/emcute_str.c | 56 ++ 7 files changed, 1089 insertions(+) create mode 100644 sys/include/net/emcute.h create mode 100644 sys/net/application_layer/emcute/Makefile create mode 100644 sys/net/application_layer/emcute/emcute.c create mode 100644 sys/net/application_layer/emcute/emcute_internal.h create mode 100644 sys/net/application_layer/emcute/emcute_str.c diff --git a/Makefile.dep b/Makefile.dep index 00cc49314a..cc85b1fcef 100644 --- a/Makefile.dep +++ b/Makefile.dep @@ -591,6 +591,12 @@ ifneq (,$(filter random,$(USEMODULE))) endif endif +ifneq (,$(filter emcute,$(USEMODULE))) + USEMODULE += core_thread_flags + USEMODULE += sock_udp + USEMODULE += xtimer +endif + # include package dependencies -include $(USEPKG:%=$(RIOTPKG)/%/Makefile.dep) diff --git a/sys/Makefile b/sys/Makefile index 5405e7b5f0..80ebe4ee28 100644 --- a/sys/Makefile +++ b/sys/Makefile @@ -108,6 +108,9 @@ endif ifneq (,$(filter gcoap,$(USEMODULE))) DIRS += net/application_layer/coap endif +ifneq (,$(filter emcute,$(USEMODULE))) + DIRS += net/application_layer/emcute +endif DIRS += $(dir $(wildcard $(addsuffix /Makefile, ${USEMODULE}))) diff --git a/sys/include/net/emcute.h b/sys/include/net/emcute.h new file mode 100644 index 0000000000..b9affca636 --- /dev/null +++ b/sys/include/net/emcute.h @@ -0,0 +1,381 @@ +/* + * Copyright (C) 2017 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @defgroup net_emcute MQTT-SN Client (emCute) + * @ingroup net + * @brief emCute, the MQTT-SN implementation for RIOT + * + * # About + * emCute is the implementation of the OASIS MQTT-SN protocol for RIOT. It is + * designed with a focus on small memory footprint and usability. + * + * + * # Design Decisions and Restrictions + * * emCute is designed to run on top of UDP only, making use of + * @ref net_sock_udp. The design is not intended to be used with any other + * transport. + * + * The implementation is based on a 2-thread model: emCute needs one thread of + * its own, in which receiving of packets and sending of ping messages are + * handled. All 'user space functions' have to run from (a) different (i.e. + * user) thread(s). emCute uses thread flags to synchronize between threads. + * + * Further know restrictions are: + * - ASCII topic names only (no support for UTF8 names, yet) + * - topic length is restricted to fit in a single length byte (248 byte max) + * - no support for wildcards in topic names. This feature requires more + * elaborate internal memory management, supposedly at the cost of quite + * increased ROM and RAM usage + * - no retransmit when receiving a REJ_CONG (reject, reason congestion). when + * getting a REJ_CONG (reject, reason congestion), the spec tells us to resend + * the original message after T_WAIT (default: >5min). This is not supported, + * as this would require to block to calling thread (or keep state) for long + * periods of time and is (in Hauke's opinion) not feasible for constrained + * nodes. + * + * + * # Error Handling + * This implementation tries minimize parameter checks to a minimum, checking as + * many parameters as feasible using assertions. For the sake of run-time + * stability and usability, typical overflow checks are always done during run- + * time and explicit error values returned in case of errors. + * + * + * # Implementation state + * In the current state, emCute supports: + * - connecting to a gateway + * - disconnecting from gateway + * - registering a last will topic and message during connection setup + * - registering topic names with the gateway (obtaining topic IDs) + * - subscribing to topics + * - unsubscribing from topics + * - updating will topic + * - updating will message + * - sending out periodic PINGREQ messages + * - handling re-transmits + * + * The following features are however still missing (but planned): + * @todo Gateway discovery (so far there is no support for handling + * ADVERTISE, GWINFO, and SEARCHGW). Open question to answer here: + * how to put / how to encode the IPv(4/6) address AND the port of + * a gateway in the GwAdd field of the GWINFO message + * @todo QOS level 2 + * @todo put the node to sleep (send DISCONNECT with duration field set) + * @todo handle DISCONNECT messages initiated by the broker/gateway + * @todo support for pre-defined and short topic IDs + * @todo handle (previously) active subscriptions on reconnect/disconnect + * @todo handle re-connect/disconnect from unresponsive gateway (in case + * a number of ping requests are unanswered) + * @todo react only to incoming ping requests that are actually send by + * the gateway we are connected to + * + * @{ + * @file + * @brief emCute MQTT-SN interface definition + * + * @author Hauke Petersen <hauke.petersen@fu-berlin.de> + */ + +#ifndef MQTTSN_H +#define MQTTSN_H + +#include <stdint.h> +#include <stddef.h> +#include <stdbool.h> + +#include "net/sock/udp.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef EMCUTE_DEFAULT_PORT +/** + * @brief Default UDP port to listen on (also used as SRC port) + */ +#define EMCUTE_DEFAULT_PORT (1883U) +#endif + +#ifndef EMCUTE_BUFSIZE +/** + * @brief Buffer size used for emCute's transmit and receive buffers + * + * The overall buffer size used by emCute is this value time two (Rx + Tx). + */ +#define EMCUTE_BUFSIZE (512U) +#endif + +#ifndef EMCUTE_ID_MAXLEN +/** + * @brief Maximum client ID length + * + * @note **Must** be less than (256 - 6) AND less than + * (@ref EMCUTE_BUFSIZE - 6). + */ +#define EMCUTE_ID_MAXLEN (196U) +#endif + +#ifndef EMCUTE_TOPIC_MAXLEN +/** + * @brief Maximum topic length + * + * @note **Must** be less than (256 - 6) AND less than + * (@ref EMCUTE_BUFSIZE - 6). + */ +#define EMCUTE_TOPIC_MAXLEN (196U) +#endif + +#ifndef EMCUTE_KEEPALIVE +/** + * @brief Keep-alive interval [in s] + * + * The node will communicate this interval to the gateway send a ping message + * every time when this amount of time has passed. + * + * For the default value, see spec v1.2, section 7.2 -> T_WAIT: > 5 min + */ +#define EMCUTE_KEEPALIVE (360) /* -> 6 min*/ +#endif + +#ifndef EMCUTE_T_RETRY +/** + * @brief Re-send interval [in seconds] + * + * For the default value, see spec v1.2, section 7.2 -> T_RETRY: 10 to 15 sec + */ +#define EMCUTE_T_RETRY (15U) /* -> 15 sec */ +#endif + +#ifndef EMCUTE_N_RETRY +/** + * @brief Number of retries when sending packets + * + * For the default value, see spec v1.2, section 7.2 -> N_RETRY: 3-5 + */ +#define EMCUTE_N_RETRY (3U) +#endif + +/** + * @brief MQTT-SN flags + * + * All MQTT-SN functions only support a sub-set of the available flags. It is up + * to the user to only supply valid/supported flags to a function. emCute will + * trigger assertion fails on the use of unsupported flags (if compiled with + * DEVELHELP). + * + * Refer to the MQTT-SN spec section 5.3.4 for further information. + */ +enum { + EMCUTE_DUP = 0x80, /**< duplicate flag */ + EMCUTE_QOS_MASK = 0x60, /**< QoS level mask */ + EMCUTE_QOS_2 = 0x40, /**< QoS level 2 */ + EMCUTE_QOS_1 = 0x20, /**< QoS level 1 */ + EMCUTE_QOS_0 = 0x00, /**< QoS level 0 */ + EMCUTE_RETAIN = 0x10, /**< retain flag */ + EMCUTE_WILL = 0x08, /**< will flag, used during CONNECT */ + EMCUTE_CS = 0x04, /**< clean session flag */ + EMCUTE_TIT_MASK = 0x03, /**< topic ID type mask */ + EMCUTE_TIT_SHORT = 0x02, /**< topic ID: short */ + EMCUTE_TIT_PREDEF = 0x01, /**< topic ID: pre-defined */ + EMCUTE_TIT_NORMAL = 0x00 /**< topic ID: normal */ +}; + +/** + * @brief Possible emCute return values + */ +enum { + EMCUTE_OK = 0, /**< everything went as expect */ + EMCUTE_NOGW = -1, /**< error: not connected to a gateway */ + EMCUTE_REJECT = -2, /**< error: operation was rejected by broker */ + EMCUTE_OVERFLOW = -3, /**< error: ran out of buffer space */ + EMCUTE_TIMEOUT = -4, /**< error: timeout */ + EMCUTE_NOTSUP = -5 /**< error: feature not supported */ +}; + +/** + * @brief MQTT-SN topic + */ +typedef struct { + const char *name; /**< topic string (currently ACSII only) */ + uint16_t id; /**< topic id, as assigned by the gateway */ +} emcute_topic_t; + +/** + * @brief Signature for callbacks fired when publish messages are received + * + * @param[in] topic topic the received data was published on + * @param[in] data published data + * @param[in] len length of @p data in bytes + */ +typedef void(*emcute_cb_t)(const emcute_topic_t *topic, void *data, size_t len); + +/** + * @brief Data-structure for keeping track of topics we register to + */ +typedef struct emcute_sub { + struct emcute_sub *next; /**< next subscription (saved in a list) */ + emcute_topic_t topic; /**< topic we subscribe to */ + emcute_cb_t cb; /**< function called when receiving messages */ + void *arg; /**< optional custom argument */ +} emcute_sub_t; + +/** + * @brief Connect to a given MQTT-SN gateway (CONNECT) + * + * When called while already connected to a gateway, call emcute_discon() first + * to disconnect from the current gateway. + * + * @param[in] remote address and port of the target MQTT-SN gateway + * @param[in] clean set to true to start a clean session + * @param[in] will_topic last will topic name, no last will will be + * configured if set to NULL + * @param[in] will_msg last will message content, will be ignored if + * @p will_topic is set to NULL + * @param[in] will_msg_len length of @p will_msg in byte + * @param[in] flags flags used for the last will, allowed are retain and + * QoS + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if already connected to a gateway + * @return EMCUTE_REJECT on connection refused by gateway + * @return EMCUTE_TIMEOUT on connection timeout + */ +int emcute_con(sock_udp_ep_t *remote, bool clean, const char *will_topic, + const void *will_msg, size_t will_msg_len, unsigned flags); + +/** + * @brief Disconnect from the gateway we are currently connected to + * + * @return EMCUTE_OK on success + * @return EMCUTE_GW if not connected to a gateway + * @return EMCUTE_TIMEOUT on response timeout + */ +int emcute_discon(void); + +/** + * @brief Get a topic ID for the given topic name from the gateway + * + * @param[in,out] topic topic to register, topic.name **must not** be NULL + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if not connected to a gateway + * @return EMCUTE_OVERFLOW if length of topic name exceeds + * @ref EMCUTE_TOPIC_MAXLEN + * @return EMCUTE_TIMEOUT on connection timeout + */ +int emcute_reg(emcute_topic_t *topic); + +/** + * @brief Publish data on the given topic + * + * @param[in] topic topic to send data to, topic **must** be registered + * (topic.id **must** populated). + * @param[in] buf data to publish + * @param[in] len length of @p data in bytes + * @param[in] flags flags used for publication, allowed are QoS and retain + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if not connected to a gateway + * @return EMCUTE_REJECT if publish message was rejected (QoS > 0 only) + * @return EMCUTE_OVERFLOW if length of data exceeds @ref EMCUTE_BUFSIZE + * @return EMCUTE_TIMEOUT on connection timeout (QoS > 0 only) + * @return EMCUTE_NOTSUP on unsupported flag values + */ +int emcute_pub(emcute_topic_t *topic, const void *buf, size_t len, + unsigned flags); + +/** + * @brief Subscribe to the given topic + * + * When calling this function, @p sub->topic.name and @p sub->cb **must** be + * set. + * + * @param[in,out] sub subscription context, @p sub->topic.name and @p sub->cb + * **must** not be NULL. + * @param[in] flags flags used when subscribing, allowed are QoS, DUP, and + * topic ID type + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if not connected to a gateway + * @return EMCUTE_OVERFLOW if length of topic name exceeds + * @ref EMCUTE_TOPIC_MAXLEN + * @return EMCUTE_TIMEOUT on connection timeout + */ +int emcute_sub(emcute_sub_t *sub, unsigned flags); + +/** + * @brief Unsubscripbe the given topic + * + * @param[in] sub subscription context + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if not connected to a gateway + * @return EMCUTE_TIMEOUT on connection timeout + */ +int emcute_unsub(emcute_sub_t *sub); + +/** + * @brief Update the last will topic + * + * @param[in] topic new last will topic + * @param[in] flags flags used for the topic, allowed are QoS and retain + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if not connected to a gateway + * @return EMCUTE_OVERFLOW if length of topic name exceeds + * @ref EMCUTE_TOPIC_MAXLEN + * @return EMCUTE_REJECT on rejection by the gateway + * @return EMCUTE_TIMEOUT on response timeout + */ +int emcute_willupd_topic(const char *topic, unsigned flags); + +/** + * @brief Update the last will message + * + * @param[in] data new message to send on last will + * @param[in] len length of @p data in bytes + * + * @return EMCUTE_OK on success + * @return EMCUTE_NOGW if not connected to a gateway + * @return EMCUTE_OVERFLOW if length of the given message exceeds + * @ref EMCUTE_BUFSIZE + * @return EMCUTE_REJECT on rejection by the gateway + * @return EMCUTE_TIMEOUT on response timeout + */ +int emcute_willupd_msg(const void *data, size_t len); + +/** + * @brief Run emCute, will 'occupy' the calling thread + * + * This function will run the emCute message receiver. It will block the thread + * it is running in. + * + * @param[in] port UDP port used for listening (default: 1883) + * @param[in] id client ID (should be unique) + */ +void emcute_run(uint16_t port, const char *id); + +/** + * @brief Return the string representation of the given type value + * + * This function is for debugging purposes. + * + * @param[in] type MQTT-SN message type + * + * @return string representation of the given type + * @return 'UNKNOWN' on invalid type value + */ +const char *emcute_type_str(uint8_t type); + +#ifdef __cplusplus +} +#endif + +#endif /* MQTTSN_H */ +/** @} */ diff --git a/sys/net/application_layer/emcute/Makefile b/sys/net/application_layer/emcute/Makefile new file mode 100644 index 0000000000..48422e909a --- /dev/null +++ b/sys/net/application_layer/emcute/Makefile @@ -0,0 +1 @@ +include $(RIOTBASE)/Makefile.base diff --git a/sys/net/application_layer/emcute/emcute.c b/sys/net/application_layer/emcute/emcute.c new file mode 100644 index 0000000000..8279f84cae --- /dev/null +++ b/sys/net/application_layer/emcute/emcute.c @@ -0,0 +1,568 @@ +/* + * Copyright (C) 2017 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup net_emcute + * @{ + * + * @file + * @brief MQTT-SN implementation + * + * @author Hauke Petersen <hauke.petersen@fu-berlin.de> + * + * @} + */ + +#include <string.h> + +#include "log.h" +#include "mutex.h" +#include "sched.h" +#include "xtimer.h" +#include "thread_flags.h" + +#include "net/emcute.h" +#include "emcute_internal.h" + +#define ENABLE_DEBUG (0) +#include "debug.h" + +#define PROTOCOL_VERSION (0x01) + +#define PUB_FLAGS (EMCUTE_QOS_MASK | EMCUTE_RETAIN) +#define SUB_FLAGS (EMCUTE_DUP | EMCUTE_QOS_MASK | EMCUTE_TIT_MASK) + +#define TFLAGS_RESP (0x0001) +#define TFLAGS_TIMEOUT (0x0002) +#define TFLAGS_ANY (TFLAGS_RESP | TFLAGS_TIMEOUT) + + +static const char *cli_id; +static sock_udp_t sock; +static sock_udp_ep_t gateway; + +static uint8_t rbuf[EMCUTE_BUFSIZE]; +static uint8_t tbuf[EMCUTE_BUFSIZE]; + +static emcute_sub_t *subs = NULL; + +static mutex_t txlock; + +static xtimer_t timer; +static uint16_t id_next = 0x1234; +static volatile uint8_t waiton = 0xff; +static volatile uint16_t waitonid = 0; +static volatile int result; + +static inline uint16_t get_u16(const uint8_t *buf) +{ +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + return (uint16_t)((buf[0] << 8) | buf[1]); +#else + return (uint16_t)((buf[1] << 8) | buf[0]); +#endif +} + +static inline void set_u16(uint8_t *buf, uint16_t val) +{ +#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__ + buf[0] = (uint8_t)(val >> 8); + buf[1] = (uint8_t)(val & 0xff); +#else + buf[0] = (uint8_t)(val & 0xff); + buf[1] = (uint8_t)(val >> 8); +#endif +} + +static int set_len(uint8_t *buf, size_t len) +{ + if (len < (0xff - 7)) { + buf[0] = len + 1; + return 1; + } + else { + buf[0] = 0x01; + set_u16(&tbuf[1], (uint16_t)(len + 3)); + return 3; + } +} + +static int get_len(uint8_t *buf, uint16_t *len) +{ + if (buf[0] != 0x01) { + *len = (uint16_t)buf[0]; + return 1; + } + else { + *len = get_u16(&buf[1]); + return 3; + } +} + +static void time_evt(void *arg) +{ + thread_flags_set((thread_t *)arg, TFLAGS_TIMEOUT); +} + +static int syncsend(uint8_t resp, size_t len, bool unlock) +{ + int res = EMCUTE_TIMEOUT; + waiton = resp; + timer.arg = (void *)sched_active_thread; + /* clear flags, in case the timer was triggered last time right before the + * remove was called */ + thread_flags_clear(TFLAGS_ANY); + + for (unsigned retries = 0; retries < EMCUTE_N_RETRY; retries++) { + DEBUG("[emcute] syncsend: sending round %i\n", retries); + sock_udp_send(&sock, tbuf, len, &gateway); + + xtimer_set(&timer, (EMCUTE_T_RETRY * US_PER_SEC)); + thread_flags_t flags = thread_flags_wait_any(TFLAGS_ANY); + if (flags & TFLAGS_RESP) { + DEBUG("[emcute] syncsend: got response [%i]\n", result); + xtimer_remove(&timer); + res = result; + retries = EMCUTE_N_RETRY; + } + } + + /* cleanup sync state */ + waiton = 0xff; + if (unlock) { + mutex_unlock(&txlock); + } + return res; +} + +static void on_disconnect(void) +{ + if (waiton == DISCONNECT) { + gateway.port = 0; + result = EMCUTE_OK; + thread_flags_set((thread_t *)timer.arg, TFLAGS_RESP); + } +} + +static void on_ack(uint8_t type, int id_pos, int ret_pos, int res_pos) +{ + if ((waiton == type) && (!id_pos || (waitonid == get_u16(&rbuf[id_pos])))) { + if (!ret_pos || (rbuf[ret_pos] == ACCEPT)) { + if (res_pos == 0) { + result = EMCUTE_OK; + } else { + result = (int)get_u16(&rbuf[res_pos]); + } + } else { + result = EMCUTE_REJECT; + } + thread_flags_set((thread_t *)timer.arg, TFLAGS_RESP); + } +} + +static void on_publish(void) +{ + emcute_sub_t *sub; + uint16_t len; + int pos = get_len(rbuf, &len); + uint16_t tid = get_u16(&rbuf[pos + 2]); + + /* allocate a response packet */ + uint8_t buf[7] = { 7, PUBACK, 0, 0, 0, 0, ACCEPT }; + /* and populate message ID and topic ID fields */ + memcpy(&buf[2], &rbuf[3], 4); + + /* return error code in case we don't support/understand active flags. So + * far we only understand QoS 1... */ + if (rbuf[2] & ~(EMCUTE_QOS_1 | EMCUTE_TIT_SHORT)) { + buf[6] = REJ_NOTSUP; + sock_udp_send(&sock, &buf, 7, &gateway); + return; + } + + /* find the registered topic */ + for (sub = subs; sub && (sub->topic.id != tid); sub = sub->next) {} + if (sub == NULL) { + buf[6] = REJ_INVTID; + sock_udp_send(&sock, &buf, 7, &gateway); + DEBUG("[emcute] on pub: no subscription found\n"); + } + else { + if (rbuf[2] & EMCUTE_QOS_1) { + sock_udp_send(&sock, &buf, 7, &gateway); + } + DEBUG("[emcute] on pub: got %i bytes of data\n", (int)(len - pos - 6)); + sub->cb(&sub->topic, &rbuf[pos + 6], (size_t)(len - pos - 6)); + } +} + +static void on_pingreq(sock_udp_ep_t *remote) +{ + /* @todo respond with a PINGRESP only if the PINGREQ came from the + * connected gateway -> see spec v1.2, section 6.11 */ + uint8_t buf[2] = { 2, PINGRESP }; + sock_udp_send(&sock, &buf, 2, remote); +} + +static void on_pingresp(void) +{ + /** @todo: trigger update something like a 'last seen' value */ +} + +static void send_ping(void) +{ + if (gateway.port != 0) { + uint8_t buf[2] = { 2, PINGREQ }; + sock_udp_send(&sock, &buf, 2, &gateway); + } +} + +int emcute_con(sock_udp_ep_t *remote, bool clean, const char *will_topic, + const void *will_msg, size_t will_msg_len, unsigned will_flags) +{ + int res; + size_t len; + + assert(!will_topic || (will_topic && will_msg && !(will_flags & ~PUB_FLAGS))); + + mutex_lock(&txlock); + + /* check for existing connections and copy given UDP endpoint */ + if (gateway.port != 0) { + return EMCUTE_NOGW; + } + memcpy(&gateway, remote, sizeof(sock_udp_ep_t)); + + /* figure out which flags to set */ + uint8_t flags = (clean) ? EMCUTE_CS : 0; + if (will_topic) { + flags |= EMCUTE_WILL; + } + + /* compute packet size */ + len = (strlen(cli_id) + 6); + tbuf[0] = (uint8_t)len; + tbuf[1] = CONNECT; + tbuf[2] = flags; + tbuf[3] = PROTOCOL_VERSION; + set_u16(&tbuf[4], EMCUTE_KEEPALIVE); + memcpy(&tbuf[6], cli_id, strlen(cli_id)); + + /* configure 'state machine' and send the connection request */ + if (will_topic) { + size_t topic_len = strlen(will_topic); + if ((topic_len > EMCUTE_TOPIC_MAXLEN) || + ((will_msg_len + 4) > EMCUTE_BUFSIZE)) { + gateway.port = 0; + return EMCUTE_OVERFLOW; + } + + res = syncsend(WILLTOPICREQ, len, false); + if (res != EMCUTE_OK) { + gateway.port = 0; + return res; + } + + /* now send WILLTOPIC */ + int pos = set_len(tbuf, (topic_len + 2)); + len = (pos + topic_len + 2); + tbuf[pos++] = WILLTOPIC; + tbuf[pos++] = will_flags; + memcpy(&tbuf[pos], will_topic, strlen(will_topic)); + + res = syncsend(WILLMSGREQ, len, false); + if (res != EMCUTE_OK) { + gateway.port = 0; + return res; + } + + /* and WILLMSG afterwards */ + pos = set_len(tbuf, (will_msg_len + 1)); + len = (pos + will_msg_len + 1); + tbuf[pos++] = WILLMSG; + memcpy(&tbuf[pos], will_msg, will_msg_len); + } + + res = syncsend(CONNACK, len, true); + if (res != EMCUTE_OK) { + gateway.port = 0; + } + return res; +} + +int emcute_discon(void) +{ + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + + mutex_lock(&txlock); + + tbuf[0] = 2; + tbuf[1] = DISCONNECT; + + return syncsend(DISCONNECT, 2, true); +} + +int emcute_reg(emcute_topic_t *topic) +{ + assert(topic && topic->name); + + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + if (strlen(topic->name) > EMCUTE_TOPIC_MAXLEN) { + return EMCUTE_OVERFLOW; + } + + mutex_lock(&txlock); + + tbuf[0] = (strlen(topic->name) + 6); + tbuf[1] = REGISTER; + set_u16(&tbuf[2], 0); + set_u16(&tbuf[4], id_next); + waitonid = id_next++; + memcpy(&tbuf[6], topic->name, strlen(topic->name)); + + int res = syncsend(REGACK, (size_t)tbuf[0], true); + if (res > 0) { + topic->id = (uint16_t)res; + res = EMCUTE_OK; + } + return res; +} + +int emcute_pub(emcute_topic_t *topic, const void *data, size_t len, + unsigned flags) +{ + int res = EMCUTE_OK; + + assert((topic->id != 0) && data && (len > 0) && !(flags & ~PUB_FLAGS)); + + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + if (len >= (EMCUTE_BUFSIZE - 9)) { + return EMCUTE_OVERFLOW; + } + if (flags & EMCUTE_QOS_2) { + return EMCUTE_NOTSUP; + } + + mutex_lock(&txlock); + + int pos = set_len(tbuf, (len + 6)); + len += (pos + 6); + tbuf[pos++] = PUBLISH; + tbuf[pos++] = flags; + set_u16(&tbuf[pos], topic->id); + pos += 2; + set_u16(&tbuf[pos], id_next); + waitonid = id_next++; + pos += 2; + memcpy(&tbuf[pos], data, len); + + if (flags & EMCUTE_QOS_1) { + res = syncsend(PUBACK, len, true); + } + else { + sock_udp_send(&sock, tbuf, len, &gateway); + mutex_unlock(&txlock); + } + + return res; +} + +int emcute_sub(emcute_sub_t *sub, unsigned flags) +{ + assert(sub && (sub->cb) && (sub->topic.name) && !(flags & ~SUB_FLAGS)); + + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + if (strlen(sub->topic.name) > EMCUTE_TOPIC_MAXLEN) { + return EMCUTE_OVERFLOW; + } + + mutex_lock(&txlock); + + tbuf[0] = (strlen(sub->topic.name) + 5); + tbuf[1] = SUBSCRIBE; + tbuf[2] = flags; + set_u16(&tbuf[3], id_next); + waitonid = id_next++; + memcpy(&tbuf[5], sub->topic.name, strlen(sub->topic.name)); + + int res = syncsend(SUBACK, (size_t)tbuf[0], false); + if (res > 0) { + DEBUG("[emcute] sub: success, topic id is %i\n", res); + sub->topic.id = res; + + /* check if subscription is already in the list, only insert if not*/ + emcute_sub_t *s; + for (s = subs; s && (s != sub); s = s->next) {} + if (!s) { + sub->next = subs; + subs = sub; + res = EMCUTE_OK; + } + } + + mutex_unlock(&txlock); + return res; +} + +int emcute_unsub(emcute_sub_t *sub) +{ + assert(sub && sub->topic.name); + + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + + mutex_lock(&txlock); + + tbuf[0] = (strlen(sub->topic.name) + 5); + tbuf[1] = UNSUBSCRIBE; + tbuf[2] = 0; + set_u16(&tbuf[3], id_next); + waitonid = id_next++; + memcpy(&tbuf[5], sub->topic.name, strlen(sub->topic.name)); + + int res = syncsend(UNSUBACK, (size_t)tbuf[0], false); + if (res == EMCUTE_OK) { + if (subs == sub) { + subs = sub->next; + } + else { + emcute_sub_t *s; + for (s = subs; s; s = s->next) { + if (s->next == sub) { + s->next = sub->next; + break; + } + } + } + } + + mutex_unlock(&txlock); + return res; +} + +int emcute_willupd_topic(const char *topic, unsigned flags) +{ + assert(!(flags & ~PUB_FLAGS)); + + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + if (topic && (strlen(topic) > EMCUTE_TOPIC_MAXLEN)) { + return EMCUTE_OVERFLOW; + } + + mutex_lock(&txlock); + + tbuf[1] = WILLTOPICUPD; + if (!topic) { + tbuf[0] = 2; + } + else { + tbuf[0] = (strlen(topic) + 3); + tbuf[2] = flags; + memcpy(&tbuf[3], topic, strlen(topic)); + } + + return syncsend(WILLTOPICRESP, (size_t)tbuf[0], true); +} + +int emcute_willupd_msg(const void *data, size_t len) +{ + assert(data && (len > 0)); + + if (gateway.port == 0) { + return EMCUTE_NOGW; + } + if (len > (EMCUTE_BUFSIZE - 4)) { + return EMCUTE_OVERFLOW; + } + + mutex_lock(&txlock); + + int pos = set_len(tbuf, (len + 1)); + len += (pos + 1); + tbuf[pos++] = WILLMSGUPD; + memcpy(&tbuf[pos], data, len); + + return syncsend(WILLMSGRESP, len, true); +} + +void emcute_run(uint16_t port, const char *id) +{ + assert(strlen(id) < EMCUTE_ID_MAXLEN); + + sock_udp_ep_t local = SOCK_IPV6_EP_ANY; + sock_udp_ep_t remote; + local.port = port; + cli_id = id; + timer.callback = time_evt; + timer.arg = NULL; + mutex_init(&txlock); + + if (sock_udp_create(&sock, &local, NULL, 0) < 0) { + LOG_ERROR("[emcute] unable to open UDP socket on port %i\n", (int)port); + return; + } + + uint32_t start = xtimer_now_usec(); + uint32_t t_out = (EMCUTE_KEEPALIVE * US_PER_SEC); + + while (1) { + ssize_t len = sock_udp_recv(&sock, rbuf, sizeof(rbuf), t_out, &remote); + + if ((len < 0) && (len != -ETIMEDOUT)) { + LOG_ERROR("[emcute] error while receiving UDP packet\n"); + return; + } + + if (len >= 2) { + /* handle the packet */ + uint16_t pkt_len; + int pos = get_len(rbuf, &pkt_len); + uint8_t type = rbuf[pos]; + + switch (type) { + case CONNACK: on_ack(type, 0, 2, 0); break; + case WILLTOPICREQ: on_ack(type, 0, 0, 0); break; + case WILLMSGREQ: on_ack(type, 0, 0, 0); break; + case REGACK: on_ack(type, 4, 6, 2); break; + case PUBLISH: on_publish(); break; + case PUBACK: on_ack(type, 4, 6, 0); break; + case SUBACK: on_ack(type, 5, 7, 3); break; + case UNSUBACK: on_ack(type, 2, 0, 0); break; + case PINGREQ: on_pingreq(&remote); break; + case PINGRESP: on_pingresp(); break; + case DISCONNECT: on_disconnect(); break; + case WILLTOPICRESP: on_ack(type, 0, 0, 0); break; + case WILLMSGRESP: on_ack(type, 0, 0, 0); break; + default: + LOG_DEBUG("[emcute] received unexpected type [%s]\n", + emcute_type_str(type)); + } + } + + uint32_t now = xtimer_now_usec(); + if ((now - start) >= (EMCUTE_KEEPALIVE * US_PER_SEC)) { + send_ping(); + start = now; + t_out = (EMCUTE_KEEPALIVE * US_PER_SEC); + } + else { + t_out = (EMCUTE_KEEPALIVE * US_PER_SEC) - (now - start); + } + } +} diff --git a/sys/net/application_layer/emcute/emcute_internal.h b/sys/net/application_layer/emcute/emcute_internal.h new file mode 100644 index 0000000000..47a8aca934 --- /dev/null +++ b/sys/net/application_layer/emcute/emcute_internal.h @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2017 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup net_emcute + * @{ + * + * @file + * @brief emCute internals + * + * @author Hauke Petersen <hauke.petersen@fu-berlin.de> + */ + +#ifndef EMCUTE_INTERNAL_H +#define EMCUTE_INTERNAL_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @brief MQTT-SN message types + */ +enum { + ADVERTISE = 0x00, /**< advertise message */ + SEARCHGW = 0x01, /**< search gateway message */ + GWINFO = 0x02, /**< gateway info message */ + CONNECT = 0x04, /**< connect message */ + CONNACK = 0x05, /**< connection acknowledgment message */ + WILLTOPICREQ = 0x06, /**< will topic request */ + WILLTOPIC = 0x07, /**< will topic */ + WILLMSGREQ = 0x08, /**< will message request */ + WILLMSG = 0x09, /**< will message */ + REGISTER = 0x0a, /**< topic registration request */ + REGACK = 0x0b, /**< topic registration acknowledgment */ + PUBLISH = 0x0c, /**< publish message */ + PUBACK = 0x0d, /**< publish acknowledgment */ + PUBCOMP = 0x0e, /**< publish received (QoS 2) */ + PUBREC = 0x0f, /**< publish complete (QoS 2) */ + PUBREL = 0x10, /**< publish release (QoS 2) */ + SUBSCRIBE = 0x12, /**< subscribe message */ + SUBACK = 0x13, /**< subscription acknowledgment */ + UNSUBSCRIBE = 0x14, /**< unsubscribe message */ + UNSUBACK = 0x15, /**< unsubscription acknowledgment */ + PINGREQ = 0x16, /**< ping request */ + PINGRESP = 0x17, /**< ping response */ + DISCONNECT = 0x18, /**< disconnect message */ + WILLTOPICUPD = 0x1a, /**< will topic update request */ + WILLTOPICRESP = 0x1b, /**< will topic update response */ + WILLMSGUPD = 0x1c, /**< will message update request */ + WILLMSGRESP = 0x1d /**< will topic update response */ +}; + +/** + * @brief MQTT-SN return codes + */ +enum { + ACCEPT = 0x00, /**< all good */ + REJ_CONG = 0x01, /**< reject, reason: congestions */ + REJ_INVTID = 0x02, /**< reject, reason: invalid topic ID */ + REJ_NOTSUP = 0x03 /**< reject, reason: operation not supported */ +}; + +#ifdef __cplusplus +} +#endif + +#endif /* EMCUTE_INTERNAL_H */ +/** @} */ diff --git a/sys/net/application_layer/emcute/emcute_str.c b/sys/net/application_layer/emcute/emcute_str.c new file mode 100644 index 0000000000..ebaa2aa465 --- /dev/null +++ b/sys/net/application_layer/emcute/emcute_str.c @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2017 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup net_emcute + * @{ + * + * @file + * @brief emCute string functions (for debugging purposes) + * + * @author Hauke Petersen <hauke.petersen@fu-berlin.de> + * + * @} + */ + +#include "net/emcute.h" +#include "emcute_internal.h" + +const char *emcute_type_str(uint8_t type) +{ + switch (type) { + case ADVERTISE: return "ADVERTISE"; + case SEARCHGW: return "SEARCHGW"; + case GWINFO: return "GWINFO"; + case CONNECT: return "CONNECT"; + case CONNACK: return "CONNACK"; + case WILLTOPICREQ: return "WILLTOPICREQ"; + case WILLTOPIC: return "WILLTOPIC"; + case WILLMSGREQ: return "WILLMSGREQ"; + case WILLMSG: return "WILLMSG"; + case REGISTER: return "REGISTER"; + case REGACK: return "REGACK"; + case PUBLISH: return "PUBLISH"; + case PUBACK: return "PUBACK"; + case PUBCOMP: return "PUBCOMP"; + case PUBREC: return "PUBREC"; + case PUBREL: return "PUBREL"; + case SUBSCRIBE: return "SUBSCRIBE"; + case SUBACK: return "SUBACK"; + case UNSUBSCRIBE: return "UNSUBSCRIBE"; + case UNSUBACK: return "UNSUBACK"; + case PINGREQ: return "PINGREQ"; + case PINGRESP: return "PINGRESP"; + case DISCONNECT: return "DISCONNECT"; + case WILLTOPICUPD: return "WILLTOPICUPD"; + case WILLTOPICRESP: return "WILLTOPICRESP"; + case WILLMSGUPD: return "WILLMSGUPD"; + case WILLMSGRESP: return "WILLMSGRESP"; + default: return "UNKNOWN"; + } +} -- GitLab