diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4a05ed1..f0bb4e5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -38,12 +38,14 @@ jobs: run: | west build -p -b olimex_lora_stm32wl_devkit samples/counter -- -DOVERLAY_CONFIG=lorawan.conf west build -p -b esp32c3_devkitm samples/counter + west build -p -b esp32c3_devkitm samples/counter -- -DOVERLAY_CONFIG=wifi_websocket.conf west build -p -b native_posix samples/counter west build -p -b nucleo_l073rz samples/counter -- -DOVERLAY_CONFIG=serial.conf west build -p -b nucleo_l073rz samples/counter -- -DOVERLAY_CONFIG=storage_eeprom.conf west build -p -b native_posix samples/counter -- -DOVERLAY_CONFIG=auth.conf west build -p -b native_posix samples/counter -- -DOVERLAY_CONFIG=can.conf west build -p -b native_posix samples/counter -- -DOVERLAY_CONFIG=log_backend.conf + west build -p -b native_posix samples/counter -- -DOVERLAY_CONFIG=native_websocket.conf - name: Build documentation working-directory: thingset-zephyr-sdk diff --git a/Kconfig b/Kconfig index 588a81c..f9fda40 100644 --- a/Kconfig +++ b/Kconfig @@ -14,6 +14,7 @@ rsource "src/Kconfig.lorawan" rsource "src/Kconfig.serial" rsource "src/Kconfig.shell" rsource "src/Kconfig.storage" +rsource "src/Kconfig.websocket" rsource "src/Kconfig.wifi" menu "General Publication Settings" diff --git a/README.md b/README.md index 65ad57b..3c07cf2 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,26 @@ west build -b olimex_lora_stm32wl_devkit samples/counter -- -DOVERLAY_CONFIG=lor west build -b native_posix samples/counter -t run -- -DOVERLAY_CONFIG=can.conf ``` +## Testing with WebSocket + +Start net-setup from Zephyr net-tools: + +``` +sudo ../tools/net-tools/net-setup.sh +``` + +Afterwards run the nativ_posix board with websocket support from another shell: + +``` +west build -b native_posix samples/counter -t run -- -DOVERLAY_CONFIG=native_websocket.conf +``` + +Check socket connections + +``` +ss -t -a -n | grep -E 'State|192.0.2.1' +``` + ## License This software is released under the [Apache-2.0 License](LICENSE). diff --git a/include/thingset/sdk.h b/include/thingset/sdk.h index 99de5ca..c6de3a3 100644 --- a/include/thingset/sdk.h +++ b/include/thingset/sdk.h @@ -53,11 +53,13 @@ extern "C" { #define TS_ID_LORAWAN_DEV_NONCE 0x273 /* Networking group items */ -#define TS_ID_NET 0x28 -#define TS_ID_NET_WIFI_SSID 0x280 -#define TS_ID_NET_WIFI_PSK 0x281 -#define TS_ID_NET_IPV4 0x282 -#define TS_ID_NET_IPV6 0x283 +#define TS_ID_NET 0x28 +#define TS_ID_NET_WIFI_SSID 0x280 +#define TS_ID_NET_WIFI_PSK 0x281 +#define TS_ID_NET_IPV4 0x282 +#define TS_ID_NET_IPV6 0x283 +#define TS_ID_NET_WEBSOCKET_IPV4 0x284 +#define TS_ID_NET_WEBSOCKET_PORT 0x285 /* Device Firmware Upgrade group items */ #define TS_ID_DFU 0x2D diff --git a/include/thingset/websocket.h b/include/thingset/websocket.h new file mode 100644 index 0000000..9aa6ebd --- /dev/null +++ b/include/thingset/websocket.h @@ -0,0 +1,20 @@ +/* + * Copyright (c) The ThingSet Project Contributors + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef THINGSET_WEBSOCKET_H_ +#define THINGSET_WEBSOCKET_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +int thingset_websocket_send_report(const char *path); + +#ifdef __cplusplus +} +#endif + +#endif /* THINGSET_WEBSOCKET_H_ */ diff --git a/samples/counter/native_websocket.conf b/samples/counter/native_websocket.conf new file mode 100644 index 0000000..469695c --- /dev/null +++ b/samples/counter/native_websocket.conf @@ -0,0 +1,35 @@ +# Copyright (c) The ThingSet Project Contributors +# SPDX-License-Identifier: Apache-2.0 + +CONFIG_HEAP_MEM_POOL_SIZE=1500 + +# Networking config +CONFIG_NETWORKING=y +CONFIG_NET_IPV4=y +CONFIG_NET_IPV6=y +CONFIG_NET_TCP=y +CONFIG_NET_SHELL=y +CONFIG_NET_STATISTICS=y + +# Sockets +CONFIG_NET_SOCKETS=y +CONFIG_NET_SOCKETS_POLL_MAX=4 + +# Network address config +CONFIG_NET_CONFIG_SETTINGS=y +CONFIG_NET_CONFIG_NEED_IPV4=y +CONFIG_NET_CONFIG_NEED_IPV6=y +CONFIG_NET_CONFIG_MY_IPV4_ADDR="192.0.2.1" +CONFIG_NET_CONFIG_MY_IPV6_ADDR="2001:db8::1" +CONFIG_NET_CONFIG_MY_IPV4_GW="192.0.2.2" +# Address of HTTP server +CONFIG_NET_CONFIG_PEER_IPV4_ADDR="192.0.2.2" +CONFIG_NET_CONFIG_PEER_IPV6_ADDR="2001:db8::2" + +# HTTP & Websocket +CONFIG_HTTP_CLIENT=y +CONFIG_WEBSOCKET_CLIENT=y + +CONFIG_THINGSET_WEBSOCKET=y +CONFIG_THINGSET_WEBSOCKET_SERVER_IPV4="192.0.2.2" +CONFIG_THINGSET_WEBSOCKET_SERVER_PORT=8000 diff --git a/samples/counter/wifi.conf b/samples/counter/wifi_websocket.conf similarity index 100% rename from samples/counter/wifi.conf rename to samples/counter/wifi_websocket.conf diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ea22ba7..8999416 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,4 +12,5 @@ target_sources_ifdef(CONFIG_THINGSET_SERIAL app PRIVATE serial.c) target_sources_ifdef(CONFIG_THINGSET_SHELL app PRIVATE shell.c) target_sources_ifdef(CONFIG_THINGSET_STORAGE_EEPROM app PRIVATE storage_eeprom.c) target_sources_ifdef(CONFIG_THINGSET_STORAGE_FLASH app PRIVATE storage_flash.c) +target_sources_ifdef(CONFIG_THINGSET_WEBSOCKET app PRIVATE websocket.c) target_sources_ifdef(CONFIG_THINGSET_WIFI app PRIVATE wifi.c) diff --git a/src/Kconfig.websocket b/src/Kconfig.websocket new file mode 100644 index 0000000..49b2805 --- /dev/null +++ b/src/Kconfig.websocket @@ -0,0 +1,40 @@ +# Copyright (c) The ThingSet Project Contributors +# SPDX-License-Identifier: Apache-2.0 + +config THINGSET_WEBSOCKET + depends on WEBSOCKET_CLIENT + bool "WebSocket interface" + +if THINGSET_WEBSOCKET + +config THINGSET_WEBSOCKET_SERVER_IPV4 + string "Default WebSocket server IPv4 address" + help + This default IP address can be changed via ThingSet. + +config THINGSET_WEBSOCKET_SERVER_PORT + int "Default WebSocket server port" + default 80 + help + This default port can be changed via ThingSet. + +config THINGSET_WEBSOCKET_RX_BUF_SIZE + int "ThingSet WebSocket RX buffer size" + range 512 4096 + default 1024 + +config THINGSET_WEBSOCKET_THREAD_STACK_SIZE + int "ThingSet WebSocket thread stack size" + default 4096 + help + Stack size of thread for processing ThingSet messages transmitted + via WebSocket. + +config THINGSET_WEBSOCKET_THREAD_PRIORITY + int "ThingSet WebSocket thread priority" + default 2 + help + Priority of thread for processing ThingSet messages transmitted + via WebSocket. + +endif # THINGSET_WEBSOCKET diff --git a/src/websocket.c b/src/websocket.c new file mode 100644 index 0000000..2148914 --- /dev/null +++ b/src/websocket.c @@ -0,0 +1,260 @@ +/* + * Copyright (c) The ThingSet Project Contributors + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +LOG_MODULE_REGISTER(thingset_websocket, CONFIG_THINGSET_SDK_LOG_LEVEL); + +static uint8_t rx_buf[CONFIG_THINGSET_WEBSOCKET_RX_BUF_SIZE]; + +static char server_addr4[16] = CONFIG_THINGSET_WEBSOCKET_SERVER_IPV4; +static uint16_t server_port = CONFIG_THINGSET_WEBSOCKET_SERVER_PORT; +static char server_path[23]; /* "/node/" + pNodeID (16 bytes) + '\0' */ + +static int websock = -1; + +#ifdef CONFIG_THINGSET_SUBSET_LIVE_METRICS +static struct k_work_delayable reporting_work; +#endif + +THINGSET_ADD_ITEM_STRING(TS_ID_NET, TS_ID_NET_WEBSOCKET_IPV4, "sWebsocketIP", server_addr4, + sizeof(server_addr4), THINGSET_ANY_RW, TS_SUBSET_NVM); + +THINGSET_ADD_ITEM_UINT16(TS_ID_NET, TS_ID_NET_WEBSOCKET_PORT, "sWebsocketPort", &server_port, + THINGSET_ANY_RW, TS_SUBSET_NVM); + +static int connect_server(sa_family_t family, int *sock, struct sockaddr *addr, socklen_t addr_len) +{ + const char *family_str = family == AF_INET ? "IPv4" : "IPv6"; + int ret = 0; + + memset(addr, 0, addr_len); + + net_sin(addr)->sin_family = AF_INET; + net_sin(addr)->sin_port = htons(server_port); + inet_pton(family, server_addr4, &net_sin(addr)->sin_addr); + + *sock = socket(family, SOCK_STREAM, IPPROTO_TCP); + + if (*sock < 0) { + LOG_ERR("Failed to create %s HTTP socket (%d)", family_str, -errno); + return -errno; + } + + ret = connect(*sock, addr, addr_len); + if (ret < 0) { + LOG_ERR("Cannot connect to %s remote (%d)", family_str, -errno); + ret = -errno; + goto fail; + } + + return 0; + +fail: + if (*sock >= 0) { + close(*sock); + *sock = -1; + } + + return ret; +} + +static int connect_cb(int sock, struct http_request *req, void *user_data) +{ + LOG_INF("Websocket %d connected.", sock); + + return 0; +} + +static int recv_data(int sock, uint8_t *buf, size_t buf_len) +{ + uint64_t remaining = ULLONG_MAX; + int total_read; + uint32_t message_type; + int ret, read_pos; + + read_pos = 0; + total_read = 0; + + while (remaining > 0) { + ret = websocket_recv_msg(sock, buf + read_pos, buf_len - read_pos, &message_type, + &remaining, 0); + if (ret < 0) { + if (ret == -EAGAIN) { + k_sleep(K_MSEC(50)); + continue; + } + + LOG_DBG("Socket connection closed while waiting (%d/%d)", ret, errno); + return -1; + } + LOG_DBG("Read %d bytes from socket", ret); + + read_pos += ret; + total_read += ret; + } + + if (remaining != 0) { + LOG_ERR("Data recv failure after %d bytes (remaining %" PRId64 ")", total_read, remaining); + LOG_HEXDUMP_DBG(buf, total_read, "received ws buf"); + return -1; + } + else { + LOG_DBG("Received %d bytes in total", total_read); + return total_read; + } +} + +int thingset_websocket_send(const uint8_t *buf, size_t len) +{ + if (websock < 0) { + return -EIO; + } + + int bytes_sent = websocket_send_msg(websock, buf, len, WEBSOCKET_OPCODE_DATA_TEXT, true, true, + SYS_FOREVER_MS); + + if (bytes_sent < 0) { + LOG_ERR("Failed to send data via WebSocket: %d", bytes_sent); + } + + return 0; +} + +int thingset_websocket_send_report(const char *path) +{ + struct shared_buffer *tx_buf = thingset_sdk_shared_buffer(); + k_sem_take(&tx_buf->lock, K_FOREVER); + + int len = + thingset_report_path(&ts, tx_buf->data, tx_buf->size, path, THINGSET_TXT_NAMES_VALUES); + + int ret = thingset_websocket_send(tx_buf->data, len); + + k_sem_give(&tx_buf->lock); + return ret; +} + +#ifdef CONFIG_THINGSET_SUBSET_LIVE_METRICS + +static void websocket_regular_report_handler(struct k_work *work) +{ + struct k_work_delayable *dwork = k_work_delayable_from_work(work); + static int64_t pub_time; + + if (live_reporting_enable && websock >= 0) { + thingset_websocket_send_report(TS_NAME_SUBSET_LIVE); + } + + pub_time += 1000 * live_reporting_period; + thingset_sdk_reschedule_work(dwork, K_TIMEOUT_ABS_MS(pub_time)); +} + +#endif + +#ifdef CONFIG_BOARD_NATIVE_POSIX +static struct sigaction sigact_default; + +static void websocket_shutdown(int sig) +{ + if (websock >= 0) { + LOG_INF("Closing websocket %d", websock); + /* closing the websocket will also close the underlying socket */ + websocket_disconnect(websock); + } + + /* also call default handler */ + (*sigact_default.sa_handler)(sig); +} +#endif + +static void websocket_thread(void) +{ + int32_t timeout = 3 * MSEC_PER_SEC; + struct sockaddr_in addr4; + int sock = -1; + int ret; + +#ifdef CONFIG_BOARD_NATIVE_POSIX + /* Ensure graceful shutdown of the socket for Ctrl+C on the console. */ + struct sigaction sigact = { 0 }; + sigact.sa_handler = websocket_shutdown; + sigemptyset(&sigact.sa_mask); + sigaction(SIGINT, &sigact, &sigact_default); +#endif + +#ifdef CONFIG_THINGSET_SUBSET_LIVE_METRICS + k_work_init_delayable(&reporting_work, websocket_regular_report_handler); + thingset_sdk_reschedule_work(&reporting_work, K_NO_WAIT); +#endif + + snprintf(server_path, sizeof(server_path), "/node/%s", node_id); + + while (true) { + + ret = connect_server(AF_INET, &sock, (struct sockaddr *)&addr4, sizeof(addr4)); + if (ret < 0 || sock < 0) { + k_sleep(K_SECONDS(10)); + continue; + } + + struct websocket_request req = { 0 }; + req.host = server_addr4; + req.url = server_path; + req.cb = connect_cb; + /* tmp_buf only used for connecting, so we can re-use our rx buffer */ + req.tmp_buf = rx_buf; + req.tmp_buf_len = sizeof(rx_buf); + + websock = websocket_connect(sock, &req, timeout, NULL); + if (websock >= 0) { + LOG_INF("Connected to %s:%d", server_addr4, server_port); + } + else { + LOG_ERR("Cannot connect to %s:%d", server_addr4, server_port); + close(sock); + k_sleep(K_SECONDS(10)); + continue; + } + + while (websock >= 0) { + int bytes_received = recv_data(websock, rx_buf, sizeof(rx_buf)); + if (bytes_received < 0) { + websocket_disconnect(websock); + break; + } + + struct shared_buffer *tx_buf = thingset_sdk_shared_buffer(); + k_sem_take(&tx_buf->lock, K_FOREVER); + + int len = thingset_process_message(&ts, (uint8_t *)rx_buf, bytes_received, tx_buf->data, + tx_buf->size); + if (len > 0) { + LOG_DBG("Sending response with %d bytes", len); + thingset_websocket_send(tx_buf->data, len); + } + + k_sem_give(&tx_buf->lock); + } + } +} + +K_THREAD_DEFINE(thingset_websocket, CONFIG_THINGSET_WEBSOCKET_THREAD_STACK_SIZE, websocket_thread, + NULL, NULL, NULL, CONFIG_THINGSET_WEBSOCKET_THREAD_PRIORITY, 0, 0);