diff --git a/Makefile.dep b/Makefile.dep index 923756103601cbfc76955bf186be2f50e95980ff..90e437438fd072f4ab1ad840d2191f215ea03d9f 100644 --- a/Makefile.dep +++ b/Makefile.dep @@ -659,6 +659,14 @@ ifneq (,$(filter openthread_contrib,$(USEMODULE))) FEATURES_REQUIRED += cpp endif +ifneq (,$(filter asymcute,$(USEMODULE))) + USEMODULE += sock_udp + USEMODULE += sock_util + USEMODULE += random + USEMODULE += event_timeout + USEMODULE += event_callback +endif + ifneq (,$(filter emcute,$(USEMODULE))) USEMODULE += core_thread_flags USEMODULE += sock_udp diff --git a/sys/Makefile b/sys/Makefile index f08ec774aed1a3be8bf77b526f0325138b7631dc..df851ae9a74fee302092007770f750e6c086a23f 100644 --- a/sys/Makefile +++ b/sys/Makefile @@ -103,6 +103,9 @@ endif ifneq (,$(filter gcoap,$(USEMODULE))) DIRS += net/application_layer/gcoap endif +ifneq (,$(filter asymcute,$(USEMODULE))) + DIRS += net/application_layer/asymcute +endif ifneq (,$(filter emcute,$(USEMODULE))) DIRS += net/application_layer/emcute endif diff --git a/sys/include/net/asymcute.h b/sys/include/net/asymcute.h new file mode 100644 index 0000000000000000000000000000000000000000..d468fb1b9cfb8b60d2b9cab4921f68b81aac5585 --- /dev/null +++ b/sys/include/net/asymcute.h @@ -0,0 +1,561 @@ +/* + * Copyright (C) 2018 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @defgroup net_asymcute MQTT-SN Client (Asymcute) + * @ingroup net + * @brief Asymcute is an asynchronous MQTT-SN implementation + * + * # About + * `Asymcute` is a asynchronous MQTT-SN client implementation, aiming at + * providing the user a high degree of flexibility. It provides a flexible + * interface that allows users to issue any number of concurrent requests to + * one or more different gateways simultaneously. + * + * # Implementation state + * + * Implemented features: + * - Connecting to multiple gateways simultaneously + * - Registration of topic names + * - Publishing of data (QoS 0 and QoS 1) + * - Subscription to topics + * - Pre-defined topic IDs as well as short and normal topic names + * + * Missing features: + * - Gateway discovery process not implemented + * - Last will feature not implemented + * - No support for QoS level 2 + * - No support for wildcard characters in topic names when subscribing + * - Actual granted QoS level on subscription is ignored + * + * @{ + * @file + * @brief Asymcute MQTT-SN interface definition + * + * @author Hauke Petersen <hauke.petersen@fu-berlin.de> + */ + +#ifndef NET_ASYMCUTE_H +#define NET_ASYMCUTE_H + +#include <stdint.h> +#include <stddef.h> +#include <stdbool.h> + +#include "assert.h" +#include "event/timeout.h" +#include "event/callback.h" +#include "net/mqttsn.h" +#include "net/sock/udp.h" +#include "net/sock/util.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef ASYMCUTE_BUFSIZE +/** + * @brief Default buffer size used for receive and request buffers + */ +#define ASYMCUTE_BUFSIZE (128U) +#endif + +#ifndef ASYMCUTE_HANDLER_PRIO +/** + * @brief Default priority for Asymcute's handler thread + */ +#define ASYMCUTE_HANDLER_PRIO (THREAD_PRIORITY_MAIN - 2) +#endif + +#ifndef ASYMCUTE_HANDLER_STACKSIZE +/** + * @brief Default stack size for Asymcute's handler thread + */ +#define ASYMCUTE_HANDLER_STACKSIZE (THREAD_STACKSIZE_DEFAULT) +#endif + +#ifndef ASYMCUTE_LISTENER_PRIO +/** + * @brief Default priority for an Asymcute listener thread + * + * @note Must be of higher priority than @ref ASYMCUTE_HANDLER_PRIO + */ +#define ASYMCUTE_LISTENER_PRIO (THREAD_PRIORITY_MAIN - 3) +#endif + +#ifndef ASYMCUTE_LISTENER_STACKSIZE +/** + * @brief Default stack size for an Asymcute listener thread + */ +#define ASYMCUTE_LISTENER_STACKSIZE (THREAD_STACKSIZE_DEFAULT) +#endif + +#ifndef ASYMCUTE_ID_MAXLEN +/** + * @brief Maximum client ID length + * + * @note Must be less than (256 - 8) and less than (ASYMCUTE_BUFSIZE - 8) + */ +#define ASYMCUTE_ID_MAXLEN (32U) +#endif + +#ifndef ASYMCUTE_TOPIC_MAXLEN +/** + * @brief Maximum topic length + * + * @note Must be less than (256 - 8) AND less than (ASYMCUTE_BUFSIZE - 8). + */ +#define ASYMCUTE_TOPIC_MAXLEN (32U) +#endif + +#ifndef ASYMCUTE_KEEPALIVE +/** + * @brief Keep alive interval [in s] communicated to the gateway + * + * For the default value, see spec v1.2, section 7.2 -> T_WAIT: > 5 min + */ +#define ASYMCUTE_KEEPALIVE (360) /* -> 6 min*/ +#endif + +#ifndef ASYMCUTE_KEEPALIVE_PING +/** + * @brief Interval to use for sending periodic ping messages + * + * The default behavior of this implementation is to send ping messages as soon + * as three quarters of the keep alive interval have passed. + * + * @note Must be less than ASYMCUTE_KEEPALIVE + */ +#define ASYMCUTE_KEEPALIVE_PING ((ASYMCUTE_KEEPALIVE / 4) * 3) +#endif + +#ifndef ASYMCUTE_T_RETRY +/** + * @brief Resend interval [in seconds] + * + * For the default value, see spec v1.2, section 7.2 -> T_RETRY: 10 to 15 sec + */ +#define ASYMCUTE_T_RETRY (10U) /* -> 10 sec */ +#endif + +#ifndef ASYMCUTE_N_RETRY +/** + * @brief Number of retransmissions until requests time out + * + * For the default value, see spec v1.2, section 7.2 -> N_RETRY: 3-5 + */ +#define ASYMCUTE_N_RETRY (3U) +#endif + +/** + * @brief Return values used by public Asymcute functions + */ +enum { + ASYMCUTE_OK = 0, /**< all is good */ + ASYMCUTE_OVERFLOW = -1, /**< error: insufficient buffer space */ + ASYMCUTE_GWERR = -2, /**< error: bad gateway connection state */ + ASYMCUTE_NOTSUP = -3, /**< error: feature not supported */ + ASYMCUTE_BUSY = -4, /**< error: context already in use */ + ASYMCUTE_REGERR = -5, /**< error: registration invalid */ + ASYMCUTE_SUBERR = -6, /**< error: subscription invalid */ +}; + +/** + * @brief Possible event types passed to the event callback + */ +enum { + ASYMCUTE_TIMEOUT, /**< request timed out */ + ASYMCUTE_CANCELED, /**< request was canceled */ + ASYMCUTE_REJECTED, /**< request was rejected */ + ASYMCUTE_CONNECTED, /**< connected to gateway */ + ASYMCUTE_DISCONNECTED, /**< connection got disconnected */ + ASYMCUTE_REGISTERED, /**< topic was registered */ + ASYMCUTE_PUBLISHED, /**< data was published */ + ASYMCUTE_SUBSCRIBED, /**< client was subscribed to topic */ + ASYMCUTE_UNSUBSCRIBED, /**< client was unsubscribed from topic */ +}; + +/** + * @brief Forward type declaration for connections contexts + */ +typedef struct asymcute_con asymcute_con_t; + +/** + * @brief Forward type declaration for request contexts + */ +typedef struct asymcute_req asymcute_req_t; + +/** + * @brief Forward type declaration for subscription contexts + */ +typedef struct asymcute_sub asymcute_sub_t; + +/** + * @brief Forward type declaration for topic definitions + */ +typedef struct asymcute_topic asymcute_topic_t; + +/** + * @brief Forward type declaration for last will definitions + */ +typedef struct asymcute_will asymcute_will_t; + +/** + * @brief Event callback used for communicating connection and request related + * events to the user + * + * @param[in] req pointer to the request context that triggered the event, + * may be NULL of unsolicited events + * @param[in] evt_type type of the event + */ +typedef void(*asymcute_evt_cb_t)(asymcute_req_t *req, unsigned evt_type); + +/** + * @brief Callback triggered on events for active subscriptions + * + * @param[in] sub pointer to subscription context triggering this event + * @param[in] evt_type type of the event + * @param[in] data incoming data for PUBLISHED events, may be NULL + * @param[in] len length of @p data in bytes + * @param[in] arg user supplied argument + */ +typedef void(*asymcute_sub_cb_t)(const asymcute_sub_t *sub, unsigned evt_type, + const void *data, size_t len, void *arg); + +/** + * @brief Context specific timeout callback, only used internally + * + * @internal + * + * @param[in] con connection context for this timeout + * @param[in] req request that timed out + * + * @return Event type to communicate to the user + */ +typedef unsigned(*asymcute_to_cb_t)(asymcute_con_t *con, asymcute_req_t *req); + +/** + * @brief Asymcute request context + */ +struct asymcute_req { + mutex_t lock; /**< synchronization lock */ + struct asymcute_req *next; /**< the requests list entry */ + asymcute_con_t *con; /**< connection the request is using */ + asymcute_to_cb_t cb; /**< internally used callback */ + void *arg; /**< internally used additional state */ + event_callback_t to_evt; /**< timeout event */ + event_timeout_t to_timer; /**< timeout timer */ + uint8_t data[ASYMCUTE_BUFSIZE]; /**< buffer holding the request's data */ + size_t data_len; /**< length of the request packet in byte */ + uint16_t msg_id; /**< used message id for this request */ + uint8_t retry_cnt; /**< retransmission counter */ +}; + +/** + * @brief Asymcute connection context + */ +struct asymcute_con { + mutex_t lock; /**< synchronization lock */ + sock_udp_t sock; /**< socket used by a connections */ + sock_udp_ep_t server_ep; /**< the gateway's UDP endpoint */ + asymcute_req_t *pending; /**< list holding pending requests */ + asymcute_sub_t *subscriptions; /**< list holding active subscriptions */ + asymcute_evt_cb_t user_cb; /**< event callback provided by user */ + event_callback_t keepalive_evt; /**< keep alive event */ + event_timeout_t keepalive_timer; /**< keep alive timer */ + uint16_t last_id; /**< last used message ID for this + * connection */ + uint8_t keepalive_retry_cnt; /**< keep alive transmission counter */ + uint8_t state; /**< connection state */ + uint8_t rxbuf[ASYMCUTE_BUFSIZE]; /**< connection specific receive buf */ + char cli_id[ASYMCUTE_ID_MAXLEN + 1];/**< buffer to store client ID */ +}; + +/** + * @brief Data-structure for holding topics and their registration status + */ +struct asymcute_topic { + asymcute_con_t *con; /**< connection used for registration */ + char name[ASYMCUTE_TOPIC_MAXLEN + 1]; /**< topic string (ACSII only) */ + uint8_t flags; /**< normal, short, or pre-defined */ + uint16_t id; /**< topic id */ +}; + +/** + * @brief Data-structure holding the state of subscriptions + */ +struct asymcute_sub { + asymcute_sub_t *next; /**< the subscriptions list entry */ + asymcute_topic_t *topic; /**< topic we subscribe to */ + asymcute_sub_cb_t cb; /**< called on incoming data */ + void *arg; /**< user supplied callback argument */ +}; + +/** + * @brief Data structure for defining a last will + */ +struct asymcute_will { + const char *topic; /**< last will topic */ + void *msg; /**< last will message content */ + size_t msg_len; /**< length of last will message content */ +}; + +/** + * @brief Check if a given request context is currently used + * + * @param[in] req request context to check + * + * @return true if context is currently used + * @return false if context is not used + */ +static inline bool asymcute_req_in_use(const asymcute_req_t *req) +{ + assert(req); + return (req->con != NULL); +} + +/** + * @brief Check if a given subscription is currently active + * + * @param[in] sub subscription context to check + * + * @return true if subscription is active + * @return false if subscription is not active + */ +static inline bool asymcute_sub_active(const asymcute_sub_t *sub) +{ + assert(sub); + return (sub->topic != NULL); +} + +/** + * @brief Reset the given topic + * + * @warning Make sure that the given topic is not used by any subscription or + * last will when calling this function + * + * @param[out] topic topic to reset + */ +static inline void asymcute_topic_reset(asymcute_topic_t *topic) +{ + assert(topic); + memset(topic, 0, sizeof(asymcute_topic_t)); +} + +/** + * @brief Check if a given topic is currently registered with a gateway + * + * @param[in] topic topic to check + * + * @return true if topic is registered + * @return false if topic is not registered + */ +static inline bool asymcute_topic_is_reg(const asymcute_topic_t *topic) +{ + assert(topic); + return (topic->con != NULL); +} + +/** + * @brief Check if a given topic is initialized + * + * @param[in] topic topic to check + * + * @return true if topic is initialized + * @return false if topic is not initialized + */ +static inline bool asymcute_topic_is_init(const asymcute_topic_t *topic) +{ + assert(topic); + return (topic->name[0] != '\0'); +} + +/** + * @brief Compare two given topics and check if they are equal + * + * @param[in] a topic A + * @param[in] b topic B + * + * @return true if both topics are equal + * @return false if topics differ + */ +static inline bool asymcute_topic_equal(const asymcute_topic_t *a, + const asymcute_topic_t *b) +{ + assert(a); + assert(b); + + return ((a->flags == b->flags) && (a->id == b->id)); +} + +/** + * @brief Initialize the given topic + * + * @param[out] topic topic to initialize + * @param[in] topic_name topic name (ASCII), may be NULL if topic should use + * a pre-defined topic ID + * @param[in] topic_id pre-defined topic ID, or don't care if @p topic_name + * is given + * + * @return ASYMCUTE_OK on success + * @return ASYMCUTE_REGERR if topic is already registered + * @return ASYMCUTE_OVERFLOW if topic name does not fit into buffer or if pre- + * defined topic ID is invalid + */ +int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name, + uint16_t topic_id); + +/** + * @brief Start a listener thread + * + * @note Must have higher priority then the handler thread (defined by + * @ref ASYMCUTE_HANDLER_PRIO) + * + * @param[in] con connection context to use for this connection + * @param[in] stack stack used to run the listener thread + * @param[in] stacksize size of @p stack in bytes + * @param[in] priority priority of the listener thread created by this function + * @param[in] callback user callback for notification about connection related + * events + * + * @return ASYMCUTE_OK on success + * @return ASYMCUTE_BUSY if connection context is already in use + */ +int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize, + char priority, asymcute_evt_cb_t callback); + +/** + * @brief Start the global Asymcute handler thread for processing timeouts and + * keep alive events + * + * This function is typically called during system initialization. + */ +void asymcute_handler_run(void); + +/** + * @brief Check if the given connection context is connected to a gateway + * + * @param[in] con connection to check + * + * @return true if context is connected + * @return false if not connected + */ +bool asymcute_is_connected(const asymcute_con_t *con); + +/** + * @brief Connect to the given MQTT-SN gateway + * + * @param[in,out] con connection to use + * @param[in,out] req request context to use for CONNECT procedure + * @param[in] server UDP endpoint of the target gateway + * @param[in] cli_id client ID to register with the gateway + * @param[in] clean set `true` to start a clean session + * @param[in] will last will (currently not implemented) + * + * @return ASYMCUTE_OK if CONNECT message has been sent + * @return ASYMCUTE_NOTSUP if last will was given (temporary until implemented) + * @return ASYMCUTE_OVERFLOW if @p cli_id is larger than ASYMCUTE_ID_MAXLEN + * @return ASYMCUTE_GWERR if the connection is not in idle state + * @return ASYMCUTE_BUSY if the given request context is already in use + */ +int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, + sock_udp_ep_t *server, const char *cli_id, bool clean, + asymcute_will_t *will); + +/** + * @brief Close the given connection + * + * @param[in,out] con connection to close + * @param[in,out] req request context to use for DISCONNECT procedure + * + * @return ASYMCUTE_OK if DISCONNECT message has been sent + * @return ASYMCUTE_GWERR if connection context is not connected + * @return ASYMCUTE_BUSY if the given request context is already in use + */ +int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req); + +/** + * @brief Register a given topic with the connected gateway + * + * @param[in] con connection to use + * @param[in,out] req request context to use for REGISTER procedure + * @param[in,out] topic topic to register + * + * @return ASYMCUTE_OK if REGISTER message has been sent + * @return ASYMCUTE_REGERR if topic is already registered + * @return ASYMCUTE_GWERR if not connected to a gateway + * @return ASYMCUTE_BUSY if the given request context is already in use + */ +int asymcute_register(asymcute_con_t *con, asymcute_req_t *req, + asymcute_topic_t *topic); + +/** + * @brief Publish the given data to the given topic + * + * @param[in] con connection to use + * @param[in,out] req request context used for PUBLISH procedure + * @param[in] topic publish data to this topic + * @param[in] data actual payload to send + * @param[in] data_len size of @p data in bytes + * @param[in] flags additional flags (QoS level, DUP, and RETAIN) + * + * @return ASYMCUTE_OK if PUBLISH message has been sent + * @return ASYMCUTE_NOTSUP if unsupported flags have been set + * @return ASYMCUTE_OVERFLOW if data does not fit into transmit buffer + * @return ASYMCUTE_REGERR if given topic is not registered + * @return ASYMCUTE_GWERR if not connected to a gateway + * @return ASYMCUTE_BUSY if the given request context is already in use + */ +int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req, + const asymcute_topic_t *topic, + const void *data, size_t data_len, uint8_t flags); + +/** + * @brief Subscribe to a given topic + * + * @param[in] con connection to use + * @param[in,out] req request context used for SUBSCRIBE procedure + * @param[out] sub subscription context to store subscription state + * @param[in,out] topic topic to subscribe to, must be initialized (see + * asymcute_topic_init()) + * @param[in] callback user callback triggered on events for this subscription + * @param[in] arg user supplied argument passed to the event callback + * @param[in] flags additional flags (QoS level and DUP) + * + * @return ASYMCUTE_OK if SUBSCRIBE message has been sent + * @return ASYMCUTE_NOTSUP if invalid or unsupported flags have been set + * @return ASYMCUTE_REGERR if topic is not initialized + * @return ASYMCUTE_GWERR if not connected to a gateway + * @return ASYMCUTE_SUBERR if already subscribed to the given topic + * @return ASYMCUTE_BUSY if the given request context is already in use + */ +int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req, + asymcute_sub_t *sub, asymcute_topic_t *topic, + asymcute_sub_cb_t callback, void *arg, uint8_t flags); + +/** + * @brief Cancel an active subscription + * + * @param[in] con connection to use + * @param[in,out] req request context used for UNSUBSCRIBE procedure + * @param[in,out] sub subscription to cancel + * + * @return ASYMCUTE_OK if UNSUBSCRIBE message has been sent + * @return ASYMCUTE_SUBERR if subscription is not currently active + * @return ASYMCUTE_GWERR if not connected to a gateway + * @return ASYMCUTE_BUSY if the given request context is already in use + */ +int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req, + asymcute_sub_t *sub); + +#ifdef __cplusplus +} +#endif + +#endif /* NET_ASYMCUTE_H */ +/** @} */ diff --git a/sys/net/application_layer/asymcute/Makefile b/sys/net/application_layer/asymcute/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..73ed079ac43ba5b7995d489d5ab59354820b9019 --- /dev/null +++ b/sys/net/application_layer/asymcute/Makefile @@ -0,0 +1,3 @@ +MODULE = asymcute + +include $(RIOTBASE)/Makefile.base diff --git a/sys/net/application_layer/asymcute/asymcute.c b/sys/net/application_layer/asymcute/asymcute.c new file mode 100644 index 0000000000000000000000000000000000000000..983d3ff15c1a33599c978cc1cd1753d6fa732b24 --- /dev/null +++ b/sys/net/application_layer/asymcute/asymcute.c @@ -0,0 +1,973 @@ +/* + * Copyright (C) 2018 Freie Universität Berlin + * + * This file is subject to the terms and conditions of the GNU Lesser + * General Public License v2.1. See the file LICENSE in the top level + * directory for more details. + */ + +/** + * @ingroup net_asymcute + * @{ + * + * @file + * @brief Asynchronous MQTT-SN implementation + * + * @author Hauke Petersen <hauke.petersen@fu-berlin.de> + * + * @} + */ + +#include <limits.h> + +#include "log.h" +#include "random.h" +#include "byteorder.h" + +#include "net/asymcute.h" + +#define ENABLE_DEBUG (0) +#include "debug.h" + +#define PROTOCOL_VERSION (0x01) + +#define RETRY_TO (ASYMCUTE_T_RETRY * US_PER_SEC) +#define KEEPALIVE_TO (ASYMCUTE_KEEPALIVE_PING * US_PER_SEC) + +#define VALID_PUBLISH_FLAGS (MQTTSN_QOS_1 | MQTTSN_DUP | MQTTSN_RETAIN) +#define VALID_SUBSCRIBE_FLAGS (MQTTSN_QOS_1 | MQTTSN_DUP) + +#define MINLEN_CONNACK (3U) +#define MINLEN_DISCONNECT (2U) +#define MINLEN_REGACK (7U) +#define MINLEN_PUBACK (7U) +#define MINLEN_SUBACK (8U) +#define MINLEN_UNSUBACK (4U) + +#define IDPOS_REGACK (4U) +#define IDPOS_PUBACK (4U) +#define IDPOS_SUBACK (5U) +#define IDPOS_UNSUBACK (2U) + +#define LEN_PINGRESP (2U) + +/* Internally used connection states */ +enum { + UNINITIALIZED = 0, /**< connection context is not initialized */ + NOTCON, /**< not connected to any gateway */ + CONNECTING, /**< connection is being setup */ + CONNECTED, /**< connection is established */ + TEARDOWN, /**< connection is being torn down */ +}; + +/* the main handler thread needs a stack and a message queue */ +static event_queue_t _queue; +static char _stack[ASYMCUTE_HANDLER_STACKSIZE]; + +/* necessary forward function declarations */ +static void _on_req_timeout(void *arg); + +static size_t _len_set(uint8_t *buf, size_t len) +{ + if (len < (0xff - 7)) { + buf[0] = len + 1; + return 1; + } + else { + buf[0] = 0x01; + byteorder_htobebufs(&buf[1], (uint16_t)(len + 3)); + return 3; + } +} + +static size_t _len_get(uint8_t *buf, size_t *len) +{ + if (buf[0] != 0x01) { + *len = (uint16_t)buf[0]; + return 1; + } + else { + *len = byteorder_bebuftohs(&buf[1]); + return 3; + } +} + +/* @pre con is locked */ +static uint16_t _msg_id_next(asymcute_con_t *con) +{ + if (++con->last_id == 0) { + return ++con->last_id; + } + return con->last_id; +} + +/* @pre con is locked */ +static asymcute_req_t *_req_preprocess(asymcute_con_t *con, + size_t msg_len, size_t min_len, + const uint8_t *buf, unsigned id_pos) +{ + /* verify message length */ + if (msg_len < min_len) { + return NULL; + } + + uint16_t msg_id = (buf == NULL) ? 0 : byteorder_bebuftohs(&buf[id_pos]); + + asymcute_req_t *res = NULL; + asymcute_req_t *iter = con->pending; + if (iter == NULL) { + return NULL; + } + if (iter->msg_id == msg_id) { + res = iter; + con->pending = iter->next; + } + while (iter && !res) { + if (iter->next && (iter->next->msg_id == msg_id)) { + res = iter->next; + iter->next = iter->next->next; + } + iter = iter->next; + } + + if (res) { + res->con = NULL; + event_timeout_clear(&res->to_timer); + } + return res; +} + +/* @pre con is locked */ +static void _req_remove(asymcute_con_t *con, asymcute_req_t *req) +{ + if (con->pending == req) { + con->pending = con->pending->next; + } + for (asymcute_req_t *cur = con->pending; cur; cur = cur->next) { + if (cur->next == req) { + cur->next = cur->next->next; + } + } + req->con = NULL; +} + +/* @pre con is locked */ +static void _compile_sub_unsub(asymcute_req_t *req, asymcute_con_t *con, + asymcute_sub_t *sub, uint8_t type) +{ + size_t topic_len = strlen(sub->topic->name); + size_t pos = _len_set(req->data, (topic_len + 4)); + + + req->msg_id = _msg_id_next(con); + req->data[pos] = type; + req->data[pos + 1] = sub->topic->flags; + byteorder_htobebufs(&req->data[pos + 2], req->msg_id); + memcpy(&req->data[pos + 4], sub->topic->name, topic_len); + req->data_len = (pos + 4 + topic_len); + req->arg = (void *)sub; +} + +static void _req_resend(asymcute_req_t *req, asymcute_con_t *con) +{ + event_timeout_set(&req->to_timer, RETRY_TO); + sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep); +} + +/* @pre con is locked */ +static void _req_send(asymcute_req_t *req, asymcute_con_t *con, + asymcute_to_cb_t cb) +{ + /* initialize request */ + req->con = con; + req->cb = cb; + req->retry_cnt = ASYMCUTE_N_RETRY; + event_callback_init(&req->to_evt, _on_req_timeout, (void *)req); + event_timeout_init(&req->to_timer, &_queue, &req->to_evt.super); + /* add request to the pending queue (if non-con request) */ + req->next = con->pending; + con->pending = req; + /* send request */ + _req_resend(req, con); +} + +static void _req_send_once(asymcute_req_t *req, asymcute_con_t *con) +{ + sock_udp_send(&con->sock, req->data, req->data_len, &con->server_ep); + mutex_unlock(&req->lock); +} + +static void _req_cancel(asymcute_req_t *req) +{ + asymcute_con_t *con = req->con; + event_timeout_clear(&req->to_timer); + req->con = NULL; + mutex_unlock(&req->lock); + con->user_cb(req, ASYMCUTE_CANCELED); +} + +static void _sub_cancel(asymcute_sub_t *sub) +{ + sub->cb(sub, ASYMCUTE_CANCELED, NULL, 0, sub->arg); + sub->topic = NULL; +} + +/* @pre con is locked */ +static void _disconnect(asymcute_con_t *con, uint8_t state) +{ + if (con->state == CONNECTED) { + /* cancel all pending requests */ + event_timeout_clear(&con->keepalive_timer); + for (asymcute_req_t *req = con->pending; req; req = req->next) { + _req_cancel(req); + } + con->pending = NULL; + for (asymcute_sub_t *sub = con->subscriptions; sub; sub = sub->next) { + _sub_cancel(sub); + } + con->subscriptions = NULL; + } + con->state = state; +} + +static void _on_req_timeout(void *arg) +{ + asymcute_req_t *req = (asymcute_req_t *)arg; + + /* only process the timeout, if the request is still active */ + if (req->con == NULL) { + return; + } + + if (req->retry_cnt--) { + /* resend the packet */ + _req_resend(req, req->con); + return; + } + else { + asymcute_con_t *con = req->con; + mutex_lock(&con->lock); + _req_remove(con, req); + /* communicate timeout to outer world */ + unsigned ret = ASYMCUTE_TIMEOUT; + if (req->cb) { + ret = req->cb(con, req); + } + mutex_unlock(&req->lock); + mutex_unlock(&con->lock); + con->user_cb(req, ret); + } +} + +static unsigned _on_con_timeout(asymcute_con_t *con, asymcute_req_t *req) +{ + (void)req; + + con->state = NOTCON; + return ASYMCUTE_TIMEOUT; +} + +static unsigned _on_discon_timeout(asymcute_con_t *con, asymcute_req_t *req) +{ + (void)req; + + con->state = NOTCON; + return ASYMCUTE_DISCONNECTED; +} + +static unsigned _on_suback_timeout(asymcute_con_t *con, asymcute_req_t *req) +{ + (void)con; + + /* reset the subscription context */ + asymcute_sub_t *sub = (asymcute_sub_t *)req->arg; + sub->topic = NULL; + return ASYMCUTE_TIMEOUT; +} + +static void _on_keepalive_evt(void *arg) +{ + asymcute_con_t *con = (asymcute_con_t *)arg; + + mutex_lock(&con->lock); + + if (con->state != CONNECTED) { + mutex_unlock(&con->lock); + return; + } + + if (con->keepalive_retry_cnt) { + /* (re)send keep alive ping and set dedicated retransmit timer */ + uint8_t ping[2] = { 2, MQTTSN_PINGREQ }; + sock_udp_send(&con->sock, ping, sizeof(ping), &con->server_ep); + con->keepalive_retry_cnt--; + event_timeout_set(&con->keepalive_timer, RETRY_TO); + mutex_unlock(&con->lock); + } + else { + _disconnect(con, NOTCON); + mutex_unlock(&con->lock); + con->user_cb(NULL, ASYMCUTE_DISCONNECTED); + } +} + +static void _on_connack(asymcute_con_t *con, const uint8_t *data, size_t len) +{ + mutex_lock(&con->lock); + asymcute_req_t *req = _req_preprocess(con, len, MINLEN_CONNACK, NULL, 0); + if (req == NULL) { + mutex_unlock(&con->lock); + return; + } + + /* check return code and mark connection as established */ + unsigned ret = ASYMCUTE_REJECTED; + if (data[2] == MQTTSN_ACCEPTED) { + con->state = CONNECTED; + /* start keep alive timer */ + event_timeout_set(&con->keepalive_timer, KEEPALIVE_TO); + ret = ASYMCUTE_CONNECTED; + } + + mutex_unlock(&req->lock); + mutex_unlock(&con->lock); + con->user_cb(req, ret); +} + +static void _on_disconnect(asymcute_con_t *con, size_t len) +{ + mutex_lock(&con->lock); + + /* we might have triggered the DISCONNECT process ourselves, so make sure + * the pending request is being handled */ + asymcute_req_t *req = _req_preprocess(con, len, MINLEN_DISCONNECT, NULL, 0); + + /* put the connection back to NOTCON in any case and let the user know */ + _disconnect(con, NOTCON); + if (req) { + mutex_unlock(&req->lock); + } + mutex_unlock(&con->lock); + con->user_cb(req, ASYMCUTE_DISCONNECTED); + +} + +static void _on_pingreq(asymcute_con_t *con) +{ + /* simply reply with a PINGRESP message */ + mutex_lock(&con->lock); + uint8_t resp[2] = { LEN_PINGRESP, MQTTSN_PINGRESP }; + sock_udp_send(&con->sock, resp, sizeof(resp), &con->server_ep); + mutex_unlock(&con->lock); +} + +static void _on_pingresp(asymcute_con_t *con) +{ + mutex_lock(&con->lock); + /* only handle ping resp message if we are actually waiting for a reply */ + if (con->keepalive_retry_cnt < ASYMCUTE_N_RETRY) { + event_timeout_clear(&con->keepalive_timer); + con->keepalive_retry_cnt = ASYMCUTE_N_RETRY; + event_timeout_set(&con->keepalive_timer, KEEPALIVE_TO); + } + mutex_unlock(&con->lock); +} + +static void _on_regack(asymcute_con_t *con, const uint8_t *data, size_t len) +{ + mutex_lock(&con->lock); + asymcute_req_t *req = _req_preprocess(con, len, MINLEN_REGACK, + data, IDPOS_REGACK); + if (req == NULL) { + mutex_unlock(&con->lock); + return; + } + + /* check return code */ + unsigned ret = ASYMCUTE_REJECTED; + if (data[6] == MQTTSN_ACCEPTED) { + /* finish the registration by applying the topic id */ + asymcute_topic_t *topic = (asymcute_topic_t *)req->arg; + topic->id = byteorder_bebuftohs(&data[2]); + topic->con = con; + ret = ASYMCUTE_REGISTERED; + } + + /* finally notify the user and free the request */ + mutex_unlock(&req->lock); + mutex_unlock(&con->lock); + con->user_cb(req, ret); +} + +static void _on_publish(asymcute_con_t *con, uint8_t *data, + size_t pos, size_t len) +{ + /* verify message length */ + if (len < (pos + 6)) { + return; + } + + uint16_t topic_id = byteorder_bebuftohs(&data[pos + 2]); + + /* find any subscription for that topic */ + mutex_lock(&con->lock); + asymcute_sub_t *sub = NULL; + for (asymcute_sub_t *cur = con->subscriptions; cur; cur = cur->next) { + if (cur->topic->id == topic_id) { + sub = cur; + break; + } + } + + /* send PUBACK if needed (QoS > 0 or on invalid topic ID) */ + if ((sub == NULL) || (data[pos + 1] & MQTTSN_QOS_1)) { + uint8_t ret = (sub) ? MQTTSN_ACCEPTED : MQTTSN_REJ_INV_TOPIC_ID; + uint8_t pkt[7] = { 7, MQTTSN_PUBACK, 0, 0, 0, 0, ret }; + /* copy topic and message id */ + memcpy(&pkt[2], &data[pos + 2], 4); + sock_udp_send(&con->sock, pkt, 7, &con->server_ep); + } + + /* release the context and notify the user (on success) */ + mutex_unlock(&con->lock); + if (sub) { + sub->cb(sub, ASYMCUTE_PUBLISHED, + &data[pos + 6], (len - (pos + 6)), sub->arg); + } +} + +static void _on_puback(asymcute_con_t *con, const uint8_t *data, size_t len) +{ + mutex_lock(&con->lock); + asymcute_req_t *req = _req_preprocess(con, len, MINLEN_PUBACK, + data, IDPOS_PUBACK); + if (req == NULL) { + mutex_unlock(&con->lock); + return; + } + + unsigned ret = (data[6] == MQTTSN_ACCEPTED) ? + ASYMCUTE_PUBLISHED : ASYMCUTE_REJECTED; + mutex_unlock(&req->lock); + mutex_unlock(&con->lock); + con->user_cb(req, ret); +} + +static void _on_suback(asymcute_con_t *con, const uint8_t *data, size_t len) +{ + mutex_lock(&con->lock); + asymcute_req_t *req = _req_preprocess(con, len, MINLEN_SUBACK, + data, IDPOS_SUBACK); + if (req == NULL) { + mutex_unlock(&con->lock); + return; + } + + unsigned ret = ASYMCUTE_REJECTED; + if (data[7] == MQTTSN_ACCEPTED) { + /* parse and apply assigned topic id */ + asymcute_sub_t *sub = (asymcute_sub_t *)req->arg; + sub->topic->id = byteorder_bebuftohs(&data[3]); + sub->topic->con = con; + /* insert subscription to connection context */ + sub->next = con->subscriptions; + con->subscriptions = sub; + ret = ASYMCUTE_SUBSCRIBED; + } + + /* notify the user */ + mutex_unlock(&req->lock); + mutex_unlock(&con->lock); + con->user_cb(req, ret); +} + +static void _on_unsuback(asymcute_con_t *con, const uint8_t *data, size_t len) +{ + mutex_lock(&con->lock); + asymcute_req_t *req = _req_preprocess(con, len, MINLEN_UNSUBACK, + data, IDPOS_UNSUBACK); + if (req == NULL) { + mutex_unlock(&con->lock); + return; + } + + /* remove subscription from list */ + asymcute_sub_t *sub = (asymcute_sub_t *)req->arg; + if (con->subscriptions == sub) { + con->subscriptions = sub->next; + } + else { + for (asymcute_sub_t *e = con->subscriptions; e && e->next; e = e->next) { + if (e->next == sub) { + e->next = e->next->next; + break; + } + } + } + + /* reset subscription context */ + sub->topic = NULL; + + /* notify user */ + mutex_unlock(&req->lock); + mutex_unlock(&con->lock); + con->user_cb(req, ASYMCUTE_UNSUBSCRIBED); +} + +static void _on_data(asymcute_con_t *con, size_t pkt_len, sock_udp_ep_t *remote) +{ + size_t len; + size_t pos = _len_get(con->rxbuf, &len); + + /* make sure the incoming data was send by 'our' gateway */ + if (!sock_udp_ep_equal(&con->server_ep, remote)) { + return; + } + /* validate incoming data: verify message length */ + if ((pkt_len < 2) || + (pkt_len <= pos) || (pkt_len < len)) { + /* length field of MQTT-SN packet seems to be invalid -> drop the pkt */ + return; + } + + /* figure out required action based on message type */ + uint8_t type = con->rxbuf[pos]; + switch (type) { + case MQTTSN_CONNACK: + _on_connack(con, con->rxbuf, len); + break; + case MQTTSN_DISCONNECT: + _on_disconnect(con, len); + break; + case MQTTSN_PINGREQ: + _on_pingreq(con); + break; + case MQTTSN_PINGRESP: + _on_pingresp(con); + break; + case MQTTSN_REGACK: + _on_regack(con, con->rxbuf, len); + break; + case MQTTSN_PUBLISH: + _on_publish(con, con->rxbuf, pos, len); + break; + case MQTTSN_PUBACK: + _on_puback(con, con->rxbuf, len); + break; + case MQTTSN_SUBACK: + _on_suback(con, con->rxbuf, len); + break; + case MQTTSN_UNSUBACK: + _on_unsuback(con, con->rxbuf, len); + break; + default: + break; + } +} + +void *_listener(void *arg) +{ + asymcute_con_t *con = (asymcute_con_t *)arg; + + /* create a socket for this listener, using an ephemeral port */ + sock_udp_ep_t local = SOCK_IPV6_EP_ANY; + if (sock_udp_create(&con->sock, &local, NULL, 0) != 0) { + LOG_ERROR("[asymcute] error creating listener socket\n"); + return NULL; + } + + while (1) { + sock_udp_ep_t remote; + int n = sock_udp_recv(&con->sock, con->rxbuf, ASYMCUTE_BUFSIZE, + SOCK_NO_TIMEOUT, &remote); + if (n > 0) { + _on_data(con, (size_t)n, &remote); + } + } + + /* should never be reached */ + return NULL; +} + +void *_handler(void *arg) +{ + (void)arg; + event_queue_init(&_queue); + event_loop(&_queue); + /* should never be reached */ + return NULL; +} + +int asymcute_listener_run(asymcute_con_t *con, char *stack, size_t stacksize, + char priority, asymcute_evt_cb_t callback) +{ + /* make sure con is not running */ + assert(con); + assert((priority > 0) && (priority < THREAD_PRIORITY_IDLE - 1)); + assert(callback); + + int ret = ASYMCUTE_OK; + + /* make sure the connection context is not already used */ + mutex_lock(&con->lock); + if (con->state != UNINITIALIZED) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* initialize the connection context */ + memset(con, 0, sizeof(asymcute_con_t)); + random_bytes((uint8_t *)&con->last_id, 2); + event_callback_init(&con->keepalive_evt, _on_keepalive_evt, con); + event_timeout_init(&con->keepalive_timer, &_queue, &con->keepalive_evt.super); + con->keepalive_retry_cnt = ASYMCUTE_N_RETRY; + con->state = NOTCON; + con->user_cb = callback; + + /* start listener thread */ + thread_create(stack, + stacksize, + priority, + THREAD_CREATE_WOUT_YIELD, + _listener, + con, + "asymcute_listener"); + +end: + mutex_unlock(&con->lock); + return ret; +} + +void asymcute_handler_run(void) +{ + thread_create(_stack, sizeof(_stack), ASYMCUTE_HANDLER_PRIO, + 0, _handler, NULL, "asymcute_main"); +} + +int asymcute_topic_init(asymcute_topic_t *topic, const char *topic_name, + uint16_t topic_id) +{ + assert(topic); + + size_t len = 0; + + if (asymcute_topic_is_reg(topic)) { + return ASYMCUTE_REGERR; + } + + if (topic_name == NULL) { + if ((topic_id == 0) || (topic_id == UINT16_MAX)) { + return ASYMCUTE_OVERFLOW; + } + } + else { + len = strlen(topic_name); + if ((len == 0) || (len > ASYMCUTE_TOPIC_MAXLEN)) { + return ASYMCUTE_OVERFLOW; + } + } + + /* reset given topic */ + asymcute_topic_reset(topic); + /* pre-defined topic ID? */ + if (topic_name == NULL) { + topic->id = topic_id; + topic->flags = MQTTSN_TIT_PREDEF; + memcpy(topic->name, &topic_id, 2); + topic->name[2] = '\0'; + } + else { + strncpy(topic->name, topic_name, sizeof(topic->name)); + if (len == 2) { + memcpy(&topic->id, topic_name, 2); + topic->flags = MQTTSN_TIT_SHORT; + } + } + + return ASYMCUTE_OK; +} + +bool asymcute_is_connected(const asymcute_con_t *con) +{ + return (con->state == CONNECTED); +} + +int asymcute_connect(asymcute_con_t *con, asymcute_req_t *req, + sock_udp_ep_t *server, const char *cli_id, bool clean, + asymcute_will_t *will) +{ + assert(con); + assert(req); + assert(server); + assert(cli_id); + + int ret = ASYMCUTE_OK; + size_t id_len = strlen(cli_id); + + /* the will feature is not yet supported */ + if (will) { + return ASYMCUTE_NOTSUP; + } + /* make sure the client ID will fit into the dedicated buffer */ + if (id_len > ASYMCUTE_ID_MAXLEN) { + return ASYMCUTE_OVERFLOW; + } + /* check if the context is not already connected to any gateway */ + mutex_lock(&con->lock); + if (con->state != NOTCON) { + ret = ASYMCUTE_GWERR; + goto end; + } + /* get mutual access to the request context */ + if (mutex_trylock(&req->lock) != 1) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* prepare the connection context */ + con->state = CONNECTING; + strncpy(con->cli_id, cli_id, sizeof(con->cli_id)); + memcpy(&con->server_ep, server, sizeof(con->server_ep)); + + /* compile and send connect message */ + req->msg_id = 0; + req->data[0] = (uint8_t)(id_len + 6); + req->data[1] = MQTTSN_CONNECT; + req->data[2] = ((clean) ? MQTTSN_CS : 0); + req->data[3] = PROTOCOL_VERSION; + byteorder_htobebufs(&req->data[4], ASYMCUTE_KEEPALIVE); + memcpy(&req->data[6], cli_id, id_len); + req->data_len = (size_t)req->data[0]; + _req_send(req, con, _on_con_timeout); + +end: + mutex_unlock(&con->lock); + return ret; +} + +int asymcute_disconnect(asymcute_con_t *con, asymcute_req_t *req) +{ + assert(con); + assert(req); + + int ret = ASYMCUTE_OK; + + /* check if we are actually connected */ + mutex_lock(&con->lock); + if (!asymcute_is_connected(con)) { + ret = ASYMCUTE_GWERR; + goto end; + } + /* get mutual access to the request context */ + if (mutex_trylock(&req->lock) != 1) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* put connection into TEARDOWN state */ + _disconnect(con, TEARDOWN); + + /* prepare and send disconnect message */ + req->msg_id = 0; + req->data[0] = 2; + req->data[1] = MQTTSN_DISCONNECT; + req->data_len = 2; + _req_send(req, con, _on_discon_timeout); + +end: + mutex_unlock(&con->lock); + return ret; +} + +int asymcute_register(asymcute_con_t *con, asymcute_req_t *req, + asymcute_topic_t *topic) +{ + assert(con); + assert(req); + assert(topic); + + int ret = ASYMCUTE_OK; + + /* test if topic is already registered */ + if (asymcute_topic_is_reg(topic)) { + return ASYMCUTE_REGERR; + } + /* make sure we are connected */ + mutex_lock(&con->lock); + if (!asymcute_is_connected(con)) { + ret = ASYMCUTE_GWERR; + goto end; + } + /* get mutual access to the request context */ + if (mutex_trylock(&req->lock) != 1) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* prepare topic */ + req->arg = (void *)topic; + size_t topic_len = strlen(topic->name); + + /* prepare registration request */ + req->msg_id = _msg_id_next(con); + size_t pos = _len_set(req->data, (topic_len + 5)); + req->data[pos] = MQTTSN_REGISTER; + byteorder_htobebufs(&req->data[pos + 1], 0); + byteorder_htobebufs(&req->data[pos + 3], req->msg_id); + memcpy(&req->data[pos + 5], topic->name, topic_len); + req->data_len = (pos + 5 + topic_len); + + /* send the request */ + _req_send(req, con, NULL); + +end: + mutex_unlock(&con->lock); + return ret; +} + +int asymcute_publish(asymcute_con_t *con, asymcute_req_t *req, + const asymcute_topic_t *topic, + const void *data, size_t data_len, uint8_t flags) +{ + assert(con); + assert(req); + assert(topic); + assert((data_len == 0) || data); + + int ret = ASYMCUTE_OK; + + /* check for valid flags */ + if ((flags & VALID_PUBLISH_FLAGS) != flags) { + return ASYMCUTE_NOTSUP; + } + /* check for message size */ + if ((data_len + 9) > ASYMCUTE_BUFSIZE) { + return ASYMCUTE_OVERFLOW; + } + /* make sure topic is registered */ + if (!asymcute_topic_is_reg(topic) || (topic->con != con)) { + return ASYMCUTE_REGERR; + } + /* check if we are connected to a gateway */ + mutex_lock(&con->lock); + if (!asymcute_is_connected(con)) { + ret = ASYMCUTE_GWERR; + goto end; + } + /* make sure request context is clear to be used */ + if (mutex_trylock(&req->lock) != 1) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* get message id */ + req->msg_id = _msg_id_next(con); + + /* assemble message */ + size_t pos = _len_set(req->data, data_len + 6); + req->data[pos] = MQTTSN_PUBLISH; + req->data[pos + 1] = (flags | topic->flags); + byteorder_htobebufs(&req->data[pos + 2], topic->id); + byteorder_htobebufs(&req->data[pos + 4], req->msg_id); + memcpy(&req->data[pos + 6], data, data_len); + req->data_len = (pos + 6 + data_len); + + /* publish selected data */ + if (flags & MQTTSN_QOS_1) { + _req_send(req, con, NULL); + } + else { + _req_send_once(req, con); + } + +end: + mutex_unlock(&con->lock); + return ret; +} + +int asymcute_subscribe(asymcute_con_t *con, asymcute_req_t *req, + asymcute_sub_t *sub, asymcute_topic_t *topic, + asymcute_sub_cb_t callback, void *arg, uint8_t flags) +{ + assert(con); + assert(req); + assert(sub); + assert(topic); + assert(callback); + + int ret = ASYMCUTE_OK; + + /* check flags for validity */ + if ((flags & VALID_SUBSCRIBE_FLAGS) != flags) { + return ASYMCUTE_NOTSUP; + } + /* is topic initialized? (though it does not need to be registered) */ + if (!asymcute_topic_is_init(topic)) { + return ASYMCUTE_REGERR; + } + /* check if we are connected to a gateway */ + mutex_lock(&con->lock); + if (!asymcute_is_connected(con)) { + ret = ASYMCUTE_GWERR; + goto end; + } + /* check if we are already subscribed to the given topic */ + for (asymcute_sub_t *sub = con->subscriptions; sub; sub = sub->next) { + if (asymcute_topic_equal(topic, sub->topic)) { + ret = ASYMCUTE_SUBERR; + goto end; + } + } + /* make sure request context is clear to be used */ + if (mutex_trylock(&req->lock) != 1) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* prepare subscription context */ + sub->cb = callback; + sub->arg = arg; + sub->topic = topic; + + /* send SUBSCRIBE message */ + _compile_sub_unsub(req, con, sub, MQTTSN_SUBSCRIBE); + _req_send(req, con, _on_suback_timeout); + +end: + mutex_unlock(&con->lock); + return ret; +} + +int asymcute_unsubscribe(asymcute_con_t *con, asymcute_req_t *req, + asymcute_sub_t *sub) +{ + assert(con); + assert(req); + assert(sub); + + int ret = ASYMCUTE_OK; + + /* make sure the subscription is actually active */ + if (!asymcute_sub_active(sub)) { + return ASYMCUTE_SUBERR; + } + /* check if we are connected to a gateway */ + mutex_lock(&con->lock); + if (!asymcute_is_connected(con)) { + ret = ASYMCUTE_GWERR; + goto end; + } + /* make sure request context is clear to be used */ + if (mutex_trylock(&req->lock) != 1) { + ret = ASYMCUTE_BUSY; + goto end; + } + + /* prepare and send UNSUBSCRIBE message */ + _compile_sub_unsub(req, con, sub, MQTTSN_UNSUBSCRIBE); + _req_send(req, con, NULL); + +end: + mutex_unlock(&con->lock); + return ret; +}