From 6c5a0a25bcb823de1bb804c8d066720cb8517213 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Mon, 4 Nov 2024 18:08:49 +0100 Subject: [PATCH] Implement liveliness token support --- CMakeLists.txt | 5 + examples/CMakeLists.txt | 3 + examples/unix/c11/z_get_liveliness.c | 117 ++++++++++ examples/unix/c11/z_liveliness.c | 112 +++++++++ examples/unix/c11/z_sub_liveliness.c | 139 +++++++++++ include/zenoh-pico.h | 1 + include/zenoh-pico.h.in | 1 + include/zenoh-pico/api/handlers.h | 4 +- include/zenoh-pico/api/liveliness.h | 119 ++++++++++ include/zenoh-pico/api/macros.h | 115 +++++---- include/zenoh-pico/collections/intmap.h | 29 +++ include/zenoh-pico/config.h | 1 + include/zenoh-pico/config.h.in | 1 + include/zenoh-pico/net/primitives.h | 54 +++++ include/zenoh-pico/net/session.h | 11 +- include/zenoh-pico/protocol/keyexpr.h | 1 + include/zenoh-pico/session/liveliness.h | 35 +++ include/zenoh-pico/session/session.h | 24 +- include/zenoh-pico/session/subscription.h | 34 ++- src/api/api.c | 18 +- src/api/liveliness.c | 146 ++++++++++++ src/collections/fifo_mt.c | 1 + src/collections/intmap.c | 31 +++ src/net/primitives.c | 148 +++++++++++- src/protocol/keyexpr.c | 8 + src/session/interest.c | 49 +++- src/session/liveliness.c | 270 ++++++++++++++++++++++ src/session/push.c | 11 +- src/session/rx.c | 33 +-- src/session/subscription.c | 96 +++++--- src/session/utils.c | 13 +- tests/z_api_liveliness_test.c | 203 ++++++++++++++++ tests/z_collections_test.c | 33 +++ tests/z_msgcodec_test.c | 103 ++++++++- 34 files changed, 1831 insertions(+), 138 deletions(-) create mode 100644 examples/unix/c11/z_get_liveliness.c create mode 100644 examples/unix/c11/z_liveliness.c create mode 100644 examples/unix/c11/z_sub_liveliness.c create mode 100644 include/zenoh-pico/api/liveliness.h create mode 100644 include/zenoh-pico/session/liveliness.h create mode 100644 src/api/liveliness.c create mode 100644 src/session/liveliness.c create mode 100644 tests/z_api_liveliness_test.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a0e87e16..aef976ff2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -216,6 +216,7 @@ set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature") set(Z_FEATURE_SUBSCRIPTION 1 CACHE STRING "Toggle subscription feature") set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature") set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature") +set(Z_FEATURE_LIVELINESS 1 CACHE STRING "Toggle liveliness feature") set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interests") set(Z_FEATURE_FRAGMENTATION 1 CACHE STRING "Toggle fragmentation") set(Z_FEATURE_ENCODING_VALUES 1 CACHE STRING "Toggle encoding values") @@ -242,6 +243,7 @@ message(STATUS "Building with feature confing:\n\ * SUBSCRIPTION: ${Z_FEATURE_SUBSCRIPTION}\n\ * QUERY: ${Z_FEATURE_QUERY}\n\ * QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\ +* LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\ * INTEREST: ${Z_FEATURE_INTEREST}\n\ * RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}") @@ -471,6 +473,7 @@ if(UNIX OR MSVC) add_executable(z_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_bytes_test.c) add_executable(z_api_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_api_bytes_test.c) add_executable(z_api_encoding_test ${PROJECT_SOURCE_DIR}/tests/z_api_encoding_test.c) + add_executable(z_api_liveliness_test ${PROJECT_SOURCE_DIR}/tests/z_api_liveliness_test.c) add_executable(z_refcount_test ${PROJECT_SOURCE_DIR}/tests/z_refcount_test.c) target_link_libraries(z_data_struct_test zenohpico::lib) @@ -489,6 +492,7 @@ if(UNIX OR MSVC) target_link_libraries(z_bytes_test zenohpico::lib) target_link_libraries(z_api_bytes_test zenohpico::lib) target_link_libraries(z_api_encoding_test zenohpico::lib) + target_link_libraries(z_api_liveliness_test zenohpico::lib) target_link_libraries(z_refcount_test zenohpico::lib) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) @@ -512,6 +516,7 @@ if(UNIX OR MSVC) add_test(z_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_bytes_test) add_test(z_api_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_bytes_test) add_test(z_api_encoding_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_encoding_test) + add_test(z_api_liveliness_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_liveliness_test) add_test(z_refcount_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_refcount_test) endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index a001356e8..96309c42c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -41,10 +41,12 @@ if(UNIX) add_example(z_sub_channel unix/c11/z_sub_channel.c) add_example(z_sub_st unix/c11/z_sub_st.c) add_example(z_sub_attachment unix/c11/z_sub_attachment.c) + add_example(z_sub_liveliness unix/c11/z_sub_liveliness.c) add_example(z_pull unix/c11/z_pull.c) add_example(z_get unix/c11/z_get.c) add_example(z_get_channel unix/c11/z_get_channel.c) add_example(z_get_attachment unix/c11/z_get_attachment.c) + add_example(z_get_liveliness unix/c11/z_get_liveliness.c) add_example(z_queryable unix/c11/z_queryable.c) add_example(z_queryable_channel unix/c11/z_queryable_channel.c) add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c) @@ -55,6 +57,7 @@ if(UNIX) add_example(z_pub_thr unix/c11/z_pub_thr.c) add_example(z_sub_thr unix/c11/z_sub_thr.c) add_example(z_bytes unix/c11/z_bytes.c) + add_example(z_liveliness unix/c11/z_liveliness.c) endif() elseif(MSVC) add_example(z_put windows/z_put.c) diff --git a/examples/unix/c11/z_get_liveliness.c b/examples/unix/c11/z_get_liveliness.c new file mode 100644 index 000000000..32fea41a7 --- /dev/null +++ b/examples/unix/c11/z_get_liveliness.c @@ -0,0 +1,117 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include + +#if Z_FEATURE_LIVELINESS == 1 + +int main(int argc, char **argv) { + const char *keyexpr = "group1/**"; + const char *mode = "client"; + const char *clocator = NULL; + const char *llocator = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_session_drop(z_session_move(&s)); + return -1; + } + + z_view_keyexpr_t ke; + if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + + printf("Sending liveliness query '%s'...\n", keyexpr); + z_owned_fifo_handler_reply_t handler; + z_owned_closure_reply_t closure; + z_fifo_channel_reply_new(&closure, &handler, 16); + if (z_liveliness_get(z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) { + printf("Liveliness query failed"); + return -1; + } + z_owned_reply_t reply; + for (z_result_t res = z_recv(z_loan(handler), &reply); res == Z_OK; res = z_recv(z_loan(handler), &reply)) { + if (z_reply_is_ok(z_loan(reply))) { + const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply)); + z_view_string_t key_str; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str); + printf(">> Alive token ('%.*s')\n", (int)z_string_len(z_loan(key_str)), z_string_data(z_loan(key_str))); + } else { + printf("Received an error\n"); + } + } + + z_drop(z_move(reply)); + z_drop(z_move(handler)); + z_drop(z_move(s)); + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_liveliness.c b/examples/unix/c11/z_liveliness.c new file mode 100644 index 000000000..07ef5aeb8 --- /dev/null +++ b/examples/unix/c11/z_liveliness.c @@ -0,0 +1,112 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include + +#if Z_FEATURE_LIVELINESS == 1 + +int main(int argc, char **argv) { + const char *keyexpr = "group1/zenoh-pico"; + const char *mode = "client"; + const char *clocator = NULL; + const char *llocator = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_session_drop(z_session_move(&s)); + return -1; + } + + z_view_keyexpr_t ke; + if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) { + printf("%s is not a valid key expression", keyexpr); + return -1; + } + + printf("Declaring liveliness token '%s'...\n", keyexpr); + z_owned_liveliness_token_t token; + if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) { + printf("Unable to create liveliness token!\n"); + exit(-1); + } + + printf("Press CTRL-C to undeclare liveliness token and quit...\n"); + while (1) { + z_sleep_s(1); + } + + // LivelinessTokens are automatically closed when dropped + // Use the code below to manually undeclare it if needed + printf("Undeclaring liveliness token...\n"); + z_drop(z_move(token)); + + z_drop(z_move(s)); + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires " + "them.\n"); + return -2; +} +#endif diff --git a/examples/unix/c11/z_sub_liveliness.c b/examples/unix/c11/z_sub_liveliness.c new file mode 100644 index 000000000..befcdce0d --- /dev/null +++ b/examples/unix/c11/z_sub_liveliness.c @@ -0,0 +1,139 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_LIVELINESS == 1 + +static int msg_nb = 0; + +void data_handler(z_loaned_sample_t *sample, void *ctx) { + (void)(ctx); + z_view_string_t key_string; + z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_string); + switch (z_sample_kind(sample)) { + case Z_SAMPLE_KIND_PUT: + printf(">> [LivelinessSubscriber] New alive token ('%.*s')\n", (int)z_string_len(z_loan(key_string)), + z_string_data(z_loan(key_string))); + break; + case Z_SAMPLE_KIND_DELETE: + printf(">> [LivelinessSubscriber] Dropped token ('%.*s')\n", (int)z_string_len(z_loan(key_string)), + z_string_data(z_loan(key_string))); + break; + } +} + +int main(int argc, char **argv) { + const char *keyexpr = "group1/**"; + const char *mode = "client"; + char *clocator = NULL; + char *llocator = NULL; + int n = 0; + + int opt; + while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + clocator = optarg; + break; + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'n': + n = atoi(optarg); + break; + case '?': + if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config; + z_config_default(&config); + zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode); + if (clocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator); + } + if (llocator != NULL) { + zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator); + } + + printf("Opening session...\n"); + z_owned_session_t s; + if (z_open(&s, z_move(config), NULL) < 0) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_session_drop(z_session_move(&s)); + return -1; + } + + printf("Declaring liveliness subscriber on '%s'...\n", keyexpr); + z_owned_closure_sample_t callback; + z_closure(&callback, data_handler, NULL, NULL); + z_owned_subscriber_t sub; + + z_view_keyexpr_t ke; + z_view_keyexpr_from_str(&ke, keyexpr); + if (z_liveliness_declare_subscriber(z_loan(s), &sub, z_loan(ke), z_move(callback), NULL) < 0) { + printf("Unable to declare liveliness subscriber.\n"); + exit(-1); + } + + printf("Press CTRL-C to quit...\n"); + while (1) { + z_sleep_s(1); + } + printf("Press CTRL-C to quit...\n"); + while (1) { + if ((n != 0) && (msg_nb >= n)) { + break; + } + sleep(1); + } + // Clean up + z_drop(z_move(sub)); + z_drop(z_move(s)); + return 0; +} +#else +int main(void) { + printf( + "ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION and Z_FEATURE_LIVELINESS but this example " + "requires it.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico.h b/include/zenoh-pico.h index b34291ca4..75d392ee9 100644 --- a/include/zenoh-pico.h +++ b/include/zenoh-pico.h @@ -26,6 +26,7 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/api/encoding.h" #include "zenoh-pico/api/handlers.h" +#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/api/macros.h" #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" diff --git a/include/zenoh-pico.h.in b/include/zenoh-pico.h.in index bd4790b1e..b19efad0e 100644 --- a/include/zenoh-pico.h.in +++ b/include/zenoh-pico.h.in @@ -26,6 +26,7 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/api/encoding.h" #include "zenoh-pico/api/handlers.h" +#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/api/macros.h" #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index c3342302a..e2d939162 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -140,10 +140,10 @@ extern "C" { /* collection_close_f */ _z_##kind_name##_mt_close, \ /* elem_owned_type */ z_owned_##item_name##_t, \ /* elem_loaned_type */ z_loaned_##item_name##_t, \ - /* elem_clone_f */ z_##item_name##_clone, \ + /* elem_clone_f */ z_##item_name##_clone, \ /* elem_move_f */ z_##item_name##_move, \ /* elem_drop_f */ z_##item_name##_drop, \ - /* elem_null */ z_internal_##item_name##_null) + /* elem_null_f */ z_internal_##item_name##_null) #define _Z_CHANNEL_DUMMY_IMPL(handler_type, handler_name, item_name) \ _Z_OWNED_TYPE_VALUE(handler_type, handler_name) \ diff --git a/include/zenoh-pico/api/liveliness.h b/include/zenoh-pico/api/liveliness.h new file mode 100644 index 000000000..46e701e96 --- /dev/null +++ b/include/zenoh-pico/api/liveliness.h @@ -0,0 +1,119 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef INCLUDE_ZENOH_PICO_API_LIVELINESS_H +#define INCLUDE_ZENOH_PICO_API_LIVELINESS_H + +#include +#include + +#include "olv_macros.h" +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/protocol/core.h" + +typedef struct { + _z_zint_t _id; + _z_keyexpr_t _key; + _z_session_weak_t _zn; +} _z_liveliness_token_t; + +_Z_OWNED_TYPE_VALUE(_z_liveliness_token_t, liveliness_token) +_Z_OWNED_FUNCTIONS_DEF(liveliness_token) + +/** + * The options for `z_liveliness_declare_token()`. + */ +typedef struct z_liveliness_declaration_options_t { + uint8_t __dummy; +} z_liveliness_declaration_options_t; + +/** + * The options for `z_liveliness_declare_subscriber()` + */ +typedef struct z_liveliness_subscriber_options_t { + uint8_t __dummy; +} z_liveliness_subscriber_options_t; + +/** + * The options for `z_liveliness_get()` + */ +typedef struct z_liveliness_get_options_t { + uint32_t timeout_ms; +} z_liveliness_get_options_t; + +/** + * Constucts default value for `z_liveliness_declare_subscriber_options_t`. + */ +z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_options_t *options); + +/** + * Declares a subscriber on liveliness tokens that intersect `keyexpr`. + * + * @param token: An uninitialized memory location where subscriber will be constructed. + * @param zs: The Zenoh session. + * @param keyexpr: The key expression to subscribe to. + * @param callback: The callback function that will be called each time a liveliness token status is changed. + * @param _options: The options to be passed to the liveliness subscriber declaration. + * + * @return 0 in case of success, negative error values otherwise. + */ +z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub, + const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback, + z_liveliness_subscriber_options_t *options); +/** + * Constructs default value for `z_liveliness_declaration_options_t`. + */ +z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_options_t *options); + +/** + * Constructs and declares a liveliness token on the network. + * + * Liveliness token subscribers on an intersecting key expression will receive a PUT sample when connectivity + * is achieved, and a DELETE sample if it's lost. + * + * @param token: An uninitialized memory location where liveliness token will be constructed. + * @param zs: A Zenos session to declare the liveliness token. + * @param keyexpr: A keyexpr to declare a liveliess token for. + * @param _options: Liveliness token declaration properties. + */ +z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_liveliness_token_t *token, + const z_loaned_keyexpr_t *keyexpr, + const z_liveliness_declaration_options_t *options); + +/** + * Constructs default value `z_liveliness_get_options_t`. + */ +z_result_t z_liveliness_get_options_default(z_liveliness_get_options_t *options); + +/** + * Queries liveliness tokens currently on the network with a key expression intersecting with `keyexpr`. + * + * @param zs: The Zenoh session. + * @param keyexpr: The key expression to query liveliness tokens for. + * @param callback: The callback function that will be called for each received reply. + * @param options: Additional options for the liveliness get operation. + */ +z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, + z_moved_closure_reply_t *callback, z_liveliness_get_options_t *options); + +/** + * Destroys a liveliness token, notifying subscribers of its destruction. + */ + +z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token); +/** + * Borrows token. + */ +const z_loaned_liveliness_token_t *z_liveliness_token_loan(const z_owned_liveliness_token_t *token); + +#endif // INCLUDE_ZENOH_PICO_API_LIVELINESS_H diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index ca17781d0..f60ba22bd 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -15,6 +15,7 @@ #define ZENOH_PICO_API_MACROS_H #include "zenoh-pico/api/handlers.h" +#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/serialization.h" #include "zenoh-pico/api/types.h" @@ -43,6 +44,7 @@ z_owned_subscriber_t : z_subscriber_loan, \ z_owned_publisher_t : z_publisher_loan, \ z_owned_queryable_t : z_queryable_loan, \ + z_owned_liveliness_token_t : z_liveliness_token_loan, \ z_owned_reply_t : z_reply_loan, \ z_owned_hello_t : z_hello_loan, \ z_owned_string_t : z_string_loan, \ @@ -74,28 +76,29 @@ )(&x) #define z_loan_mut(x) _Generic((x), \ - z_owned_keyexpr_t : z_keyexpr_loan_mut, \ - z_owned_config_t : z_config_loan_mut, \ - z_owned_session_t : z_session_loan_mut, \ - z_owned_publisher_t : z_publisher_loan_mut, \ - z_owned_queryable_t : z_queryable_loan_mut, \ - z_owned_subscriber_t : z_subscriber_loan_mut, \ - z_owned_reply_t : z_reply_loan_mut, \ - z_owned_hello_t : z_hello_loan_mut, \ - z_owned_string_t : z_string_loan_mut, \ - z_view_string_t : z_view_string_loan_mut, \ - z_owned_string_array_t : z_string_array_loan_mut, \ - z_owned_sample_t : z_sample_loan_mut, \ - z_owned_query_t : z_query_loan_mut, \ - z_owned_slice_t : z_slice_loan_mut, \ - z_view_slice_t : z_view_slice_loan_mut, \ - z_owned_bytes_t : z_bytes_loan_mut, \ - z_owned_task_t : z_task_loan_mut, \ - z_owned_mutex_t : z_mutex_loan_mut, \ - z_owned_condvar_t : z_condvar_loan_mut, \ - z_owned_reply_err_t : z_reply_err_loan_mut, \ - ze_owned_serializer_t : ze_serializer_loan_mut, \ - z_owned_bytes_writer_t : z_bytes_writer_loan_mut \ + z_owned_keyexpr_t : z_keyexpr_loan_mut, \ + z_owned_config_t : z_config_loan_mut, \ + z_owned_session_t : z_session_loan_mut, \ + z_owned_publisher_t : z_publisher_loan_mut, \ + z_owned_queryable_t : z_queryable_loan_mut, \ + z_owned_liveliness_token_t : z_liveliness_token_loan_mut, \ + z_owned_subscriber_t : z_subscriber_loan_mut, \ + z_owned_reply_t : z_reply_loan_mut, \ + z_owned_hello_t : z_hello_loan_mut, \ + z_owned_string_t : z_string_loan_mut, \ + z_view_string_t : z_view_string_loan_mut, \ + z_owned_string_array_t : z_string_array_loan_mut, \ + z_owned_sample_t : z_sample_loan_mut, \ + z_owned_query_t : z_query_loan_mut, \ + z_owned_slice_t : z_slice_loan_mut, \ + z_view_slice_t : z_view_slice_loan_mut, \ + z_owned_bytes_t : z_bytes_loan_mut, \ + z_owned_task_t : z_task_loan_mut, \ + z_owned_mutex_t : z_mutex_loan_mut, \ + z_owned_condvar_t : z_condvar_loan_mut, \ + z_owned_reply_err_t : z_reply_err_loan_mut, \ + ze_owned_serializer_t : ze_serializer_loan_mut, \ + z_owned_bytes_writer_t : z_bytes_writer_loan_mut \ )(&x) /** @@ -111,6 +114,7 @@ z_moved_subscriber_t* : z_subscriber_drop, \ z_moved_publisher_t* : z_publisher_drop, \ z_moved_queryable_t* : z_queryable_drop, \ + z_moved_liveliness_token_t* : z_liveliness_token_drop, \ z_moved_reply_t* : z_reply_drop, \ z_moved_hello_t* : z_hello_drop, \ z_moved_string_t* : z_string_drop, \ @@ -150,29 +154,30 @@ */ #define z_internal_check(x) _Generic((x), \ - z_owned_keyexpr_t : z_internal_keyexpr_check, \ - z_owned_reply_err_t : z_internal_reply_err_check, \ - z_owned_config_t : z_internal_config_check, \ - z_owned_session_t : z_internal_session_check, \ - z_owned_subscriber_t : z_internal_subscriber_check, \ - z_owned_publisher_t : z_internal_publisher_check, \ - z_owned_queryable_t : z_internal_queryable_check, \ - z_owned_reply_t : z_internal_reply_check, \ - z_owned_hello_t : z_internal_hello_check, \ - z_owned_string_t : z_internal_string_check, \ - z_owned_string_array_t : z_internal_string_array_check, \ - z_owned_closure_sample_t : z_internal_closure_sample_check, \ - z_owned_closure_query_t : z_internal_closure_query_check, \ - z_owned_closure_reply_t : z_internal_closure_reply_check, \ - z_owned_closure_hello_t : z_internal_closure_hello_check, \ - z_owned_closure_zid_t : z_internal_closure_zid_check, \ - z_owned_slice_t : z_internal_slice_check, \ - z_owned_bytes_t : z_internal_bytes_check, \ - z_owned_sample_t : z_internal_sample_check, \ - z_owned_query_t : z_internal_query_check, \ - z_owned_encoding_t : z_internal_encoding_check, \ - ze_owned_serializer_t : ze_internal_serializer_check, \ - z_owned_bytes_writer_t : z_internal_bytes_writer_check \ + z_owned_keyexpr_t : z_internal_keyexpr_check, \ + z_owned_reply_err_t : z_internal_reply_err_check, \ + z_owned_config_t : z_internal_config_check, \ + z_owned_session_t : z_internal_session_check, \ + z_owned_subscriber_t : z_internal_subscriber_check, \ + z_owned_publisher_t : z_internal_publisher_check, \ + z_owned_queryable_t : z_internal_queryable_check, \ + z_owned_liveliness_token_t : z_internal_liveliness_token_check, \ + z_owned_reply_t : z_internal_reply_check, \ + z_owned_hello_t : z_internal_hello_check, \ + z_owned_string_t : z_internal_string_check, \ + z_owned_string_array_t : z_internal_string_array_check, \ + z_owned_closure_sample_t : z_internal_closure_sample_check, \ + z_owned_closure_query_t : z_internal_closure_query_check, \ + z_owned_closure_reply_t : z_internal_closure_reply_check, \ + z_owned_closure_hello_t : z_internal_closure_hello_check, \ + z_owned_closure_zid_t : z_internal_closure_zid_check, \ + z_owned_slice_t : z_internal_slice_check, \ + z_owned_bytes_t : z_internal_bytes_check, \ + z_owned_sample_t : z_internal_sample_check, \ + z_owned_query_t : z_internal_query_check, \ + z_owned_encoding_t : z_internal_encoding_check, \ + ze_owned_serializer_t : ze_internal_serializer_check, \ + z_owned_bytes_writer_t : z_internal_bytes_writer_check \ )(&x) /** @@ -225,6 +230,7 @@ z_owned_subscriber_t : z_subscriber_move, \ z_owned_publisher_t : z_publisher_move, \ z_owned_queryable_t : z_queryable_move, \ + z_owned_liveliness_token_t : z_liveliness_token_move, \ z_owned_reply_t : z_reply_move, \ z_owned_hello_t : z_hello_move, \ z_owned_string_t : z_string_move, \ @@ -283,6 +289,7 @@ z_owned_publisher_t *: z_publisher_take, \ z_owned_query_t *: z_query_take, \ z_owned_queryable_t *: z_queryable_take, \ + z_owned_liveliness_token_t *: z_liveliness_token_take, \ z_owned_reply_t *: z_reply_take, \ z_owned_reply_err_t *: z_reply_err_take, \ z_owned_ring_handler_query_t *: z_ring_handler_query_take, \ @@ -336,6 +343,7 @@ z_owned_config_t * : z_internal_config_null, \ z_owned_subscriber_t * : z_internal_subscriber_null, \ z_owned_queryable_t * : z_internal_queryable_null, \ + z_owned_liveliness_token_t * : z_internal_liveliness_token_null, \ z_owned_query_t * : z_internal_query_null, \ z_owned_reply_t * : z_internal_reply_null, \ z_owned_hello_t * : z_internal_hello_null, \ @@ -389,6 +397,7 @@ inline const z_loaned_session_t* z_loan(const z_owned_session_t& x) { return z_s inline const z_loaned_subscriber_t* z_loan(const z_owned_subscriber_t& x) { return z_subscriber_loan(&x); } inline const z_loaned_publisher_t* z_loan(const z_owned_publisher_t& x) { return z_publisher_loan(&x); } inline const z_loaned_queryable_t* z_loan(const z_owned_queryable_t& x) { return z_queryable_loan(&x); } +inline const z_loaned_liveliness_token_t* z_loan(const z_owned_liveliness_token_t& x) { return z_liveliness_token_loan(&x); } inline const z_loaned_reply_t* z_loan(const z_owned_reply_t& x) { return z_reply_loan(&x); } inline const z_loaned_hello_t* z_loan(const z_owned_hello_t& x) { return z_hello_loan(&x); } inline const z_loaned_string_t* z_loan(const z_owned_string_t& x) { return z_string_loan(&x); } @@ -425,6 +434,7 @@ inline z_loaned_config_t* z_loan_mut(z_owned_config_t& x) { return z_config_loan inline z_loaned_session_t* z_loan_mut(z_owned_session_t& x) { return z_session_loan_mut(&x); } inline z_loaned_publisher_t* z_loan_mut(z_owned_publisher_t& x) { return z_publisher_loan_mut(&x); } inline z_loaned_queryable_t* z_loan_mut(z_owned_queryable_t& x) { return z_queryable_loan_mut(&x); } +inline z_loaned_liveliness_token_t* z_loan_mut(z_owned_liveliness_token_t& x) { return z_liveliness_token_loan_mut(&x); } inline z_loaned_subscriber_t* z_loan_mut(z_owned_subscriber_t& x) { return z_subscriber_loan_mut(&x); } inline z_loaned_reply_t* z_loan_mut(z_owned_reply_t& x) { return z_reply_loan_mut(&x); } inline z_loaned_hello_t* z_loan_mut(z_owned_hello_t& x) { return z_hello_loan_mut(&x); } @@ -451,6 +461,7 @@ inline void z_drop(z_moved_keyexpr_t* v) { z_keyexpr_drop(v); } inline void z_drop(z_moved_config_t* v) { z_config_drop(v); } inline void z_drop(z_moved_subscriber_t* v) { z_subscriber_drop(v); } inline void z_drop(z_moved_queryable_t* v) { z_queryable_drop(v); } +inline void z_drop(z_moved_liveliness_token_t* v) { z_liveliness_token_drop(v); } inline void z_drop(z_moved_reply_t* v) { z_reply_drop(v); } inline void z_drop(z_moved_hello_t* v) { z_hello_drop(v); } inline void z_drop(z_moved_string_t* v) { z_string_drop(v); } @@ -484,6 +495,7 @@ inline void z_internal_null(z_owned_keyexpr_t* v) { z_internal_keyexpr_null(v); inline void z_internal_null(z_owned_config_t* v) { z_internal_config_null(v); } inline void z_internal_null(z_owned_subscriber_t* v) { z_internal_subscriber_null(v); } inline void z_internal_null(z_owned_queryable_t* v) { z_internal_queryable_null(v); } +inline void z_internal_null(z_owned_liveliness_token_t* v) { z_internal_liveliness_token_null(v); } inline void z_internal_null(z_owned_query_t* v) { z_internal_query_null(v); } inline void z_internal_null(z_owned_sample_t* v) { z_internal_sample_null(v); } inline void z_internal_null(z_owned_reply_t* v) { z_internal_reply_null(v); } @@ -513,6 +525,7 @@ inline bool z_internal_check(const z_owned_keyexpr_t& v) { return z_internal_key inline bool z_internal_check(const z_owned_config_t& v) { return z_internal_config_check(&v); } inline bool z_internal_check(const z_owned_subscriber_t& v) { return z_internal_subscriber_check(&v); } inline bool z_internal_check(const z_owned_queryable_t& v) { return z_internal_queryable_check(&v); } +inline bool z_internal_check(const z_owned_liveliness_token_t& v) { return z_internal_liveliness_token_check(&v); } inline bool z_internal_check(const z_owned_reply_t& v) { return z_internal_reply_check(&v); } inline bool z_internal_check(const z_owned_query_t& v) { return z_internal_query_check(&v); } inline bool z_internal_check(const z_owned_hello_t& v) { return z_internal_hello_check(&v); } @@ -643,6 +656,7 @@ inline z_moved_keyexpr_t* z_move(z_owned_keyexpr_t& x) { return z_keyexpr_move(& inline z_moved_publisher_t* z_move(z_owned_publisher_t& x) { return z_publisher_move(&x); } inline z_moved_query_t* z_move(z_owned_query_t& x) { return z_query_move(&x); } inline z_moved_queryable_t* z_move(z_owned_queryable_t& x) { return z_queryable_move(&x); } +inline z_moved_liveliness_token_t* z_move(z_owned_liveliness_token_t& x) { return z_liveliness_token_move(&x); } inline z_moved_reply_t* z_move(z_owned_reply_t& x) { return z_reply_move(&x); } inline z_moved_sample_t* z_move(z_owned_sample_t& x) { return z_sample_move(&x); } inline z_moved_session_t* z_move(z_owned_session_t& x) { return z_session_move(&x); } @@ -670,6 +684,9 @@ inline void z_take(z_owned_keyexpr_t* this_, z_moved_keyexpr_t* v) { z_keyexpr_t inline void z_take(z_owned_config_t* this_, z_moved_config_t* v) { z_config_take(this_, v); } inline void z_take(z_owned_subscriber_t* this_, z_moved_subscriber_t* v) { return z_subscriber_take(this_, v); } inline void z_take(z_owned_queryable_t* this_, z_moved_queryable_t* v) { return z_queryable_take(this_, v); } +inline void z_take(z_owned_liveliness_token_t* this_, z_moved_liveliness_token_t* v) { + return z_liveliness_token_take(this_, v); +} inline void z_take(z_owned_reply_t* this_, z_moved_reply_t* v) { z_reply_take(this_, v); } inline void z_take(z_owned_hello_t* this_, z_moved_hello_t* v) { z_hello_take(this_, v); } inline void z_take(z_owned_string_t* this_, z_moved_string_t* v) { z_string_take(this_, v); } @@ -803,6 +820,14 @@ struct z_owned_to_loaned_type_t { typedef z_loaned_queryable_t type; }; template <> +struct z_loaned_to_owned_type_t { + typedef z_owned_liveliness_token_t type; +}; +template <> +struct z_owned_to_loaned_type_t { + typedef z_loaned_liveliness_token_t type; +}; +template <> struct z_loaned_to_owned_type_t { typedef z_owned_reply_t type; }; diff --git a/include/zenoh-pico/collections/intmap.h b/include/zenoh-pico/collections/intmap.h index 136cf9772..8e414e548 100644 --- a/include/zenoh-pico/collections/intmap.h +++ b/include/zenoh-pico/collections/intmap.h @@ -54,6 +54,17 @@ typedef struct { _z_list_t **_vals; } _z_int_void_map_t; +/** + * An iterator of an hashmap with integer keys. + */ +typedef struct { + _z_int_void_map_entry_t *_entry; + + const _z_int_void_map_t *_map; + size_t _idx; + _z_list_t *_list_ptr; +} _z_int_void_map_iterator_t; + void _z_int_void_map_init(_z_int_void_map_t *map, size_t capacity); _z_int_void_map_t _z_int_void_map_make(size_t capacity); @@ -71,6 +82,11 @@ _z_int_void_map_t _z_int_void_map_clone(const _z_int_void_map_t *src, z_element_ void _z_int_void_map_clear(_z_int_void_map_t *map, z_element_free_f f); void _z_int_void_map_free(_z_int_void_map_t **map, z_element_free_f f); +_z_int_void_map_iterator_t _z_int_void_map_iterator_make(const _z_int_void_map_t *map); +bool _z_int_void_map_iterator_next(_z_int_void_map_iterator_t *iter); +size_t _z_int_void_map_iterator_key(const _z_int_void_map_iterator_t *iter); +void *_z_int_void_map_iterator_value(const _z_int_void_map_iterator_t *iter); + #define _Z_INT_MAP_DEFINE(name, type) \ typedef _z_int_void_map_entry_t name##_intmap_entry_t; \ static inline void name##_intmap_entry_elem_free(void **e) { \ @@ -89,6 +105,7 @@ void _z_int_void_map_free(_z_int_void_map_t **map, z_element_free_f f); return dst; \ } \ typedef _z_int_void_map_t name##_intmap_t; \ + typedef _z_int_void_map_iterator_t name##_intmap_iterator_t; \ static inline void name##_intmap_init(name##_intmap_t *m) { \ _z_int_void_map_init(m, _Z_DEFAULT_INT_MAP_CAPACITY); \ } \ @@ -115,6 +132,18 @@ void _z_int_void_map_free(_z_int_void_map_t **map, z_element_free_f f); } \ static inline void name##_intmap_free(name##_intmap_t **m) { \ _z_int_void_map_free(m, name##_intmap_entry_elem_free); \ + } \ + static inline name##_intmap_iterator_t name##_intmap_iterator_make(const name##_intmap_t *m) { \ + return _z_int_void_map_iterator_make(m); \ + } \ + static inline bool name##_intmap_iterator_next(name##_intmap_iterator_t *iter) { \ + return _z_int_void_map_iterator_next(iter); \ + } \ + static inline size_t name##_intmap_iterator_key(const name##_intmap_iterator_t *iter) { \ + return _z_int_void_map_iterator_key(iter); \ + } \ + static inline type *name##_intmap_iterator_value(const name##_intmap_iterator_t *iter) { \ + return (type *)_z_int_void_map_iterator_value(iter); \ } #ifdef __cplusplus diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index b3c5d3ec1..14c2d50cc 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -27,6 +27,7 @@ #define Z_FEATURE_SUBSCRIPTION 1 #define Z_FEATURE_QUERY 1 #define Z_FEATURE_QUERYABLE 1 +#define Z_FEATURE_LIVELINESS 1 #define Z_FEATURE_RAWETH_TRANSPORT 0 #define Z_FEATURE_INTEREST 1 #define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0 diff --git a/include/zenoh-pico/config.h.in b/include/zenoh-pico/config.h.in index 40941d9fa..175fc8fdf 100644 --- a/include/zenoh-pico/config.h.in +++ b/include/zenoh-pico/config.h.in @@ -27,6 +27,7 @@ #define Z_FEATURE_SUBSCRIPTION @Z_FEATURE_SUBSCRIPTION@ #define Z_FEATURE_QUERY @Z_FEATURE_QUERY@ #define Z_FEATURE_QUERYABLE @Z_FEATURE_QUERYABLE@ +#define Z_FEATURE_LIVELINESS @Z_FEATURE_LIVELINESS@ #define Z_FEATURE_RAWETH_TRANSPORT @Z_FEATURE_RAWETH_TRANSPORT@ #define Z_FEATURE_INTEREST @Z_FEATURE_INTEREST@ #define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION @Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION@ diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 95b47078b..462653f10 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -17,6 +17,7 @@ #include #include "zenoh-pico/api/constants.h" +#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/collections/string.h" #include "zenoh-pico/net/encoding.h" #include "zenoh-pico/net/publish.h" @@ -255,6 +256,59 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete bool is_express); #endif +#if Z_FEATURE_LIVELINESS == 1 + +#if Z_FEATURE_SUBSCRIPTION == 1 +/** + * Declare a :c:type:`_z_subscriber_t` for the given liveliness key. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value. + * callback: The callback function that will be called each time a matching liveliness token changed. + * arg: A pointer that will be passed to the **callback** on each call. + * + * Returns: + * The created :c:type:`_z_subscriber_t` (in null state if the declaration failed). + */ +_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, + void *arg); + +/** + * Undeclare a liveliness :c:type:`_z_subscriber_t`. + * + * Parameters: + * sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the + * subscriber upon successful return. + * Returns: + * 0 if success, or a negative value identifying the error. + */ +z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub); +#endif // Z_FEATURE_SUBSCRIPTION == 1 + +#if Z_FEATURE_QUERY == 1 +/** + * Query liveliness token state. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to liveliness token. + * callback: The callback function that will be called on reception of replies for this query. + * dropper: The callback function that will be called on upon completion of the callback. + * arg: A pointer that will be passed to the **callback** on each call. + * timeout_ms: The timeout value of this query. + */ +z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback, + _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms); +#endif // Z_FEATURE_QUERY == 1 + +z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token, + _z_keyexpr_t keyexpr); +z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token); + +#endif // Z_FEATURE_LIVELINESS == 1 + #if Z_FEATURE_INTEREST == 1 uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, void *arg); diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index d7ab4495a..96430ba19 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -54,8 +54,15 @@ typedef struct _z_session_t { // Session subscriptions #if Z_FEATURE_SUBSCRIPTION == 1 - _z_subscription_rc_list_t *_local_subscriptions; - _z_subscription_rc_list_t *_remote_subscriptions; + _z_subscription_rc_list_t *_subscriptions; + _z_subscription_rc_list_t *_liveliness_subscriptions; +#endif + +#if Z_FEATURE_LIVELINESS == 1 + _z_zint_t _liveliness_query_id; + _z_keyexpr_intmap_t _local_tokens; + _z_keyexpr_intmap_t _remote_tokens; + _z_liveliness_pending_query_intmap_t _liveliness_pending_queries; #endif // Session queryables diff --git a/include/zenoh-pico/protocol/keyexpr.h b/include/zenoh-pico/protocol/keyexpr.h index 27cdb8853..580c9e63e 100644 --- a/include/zenoh-pico/protocol/keyexpr.h +++ b/include/zenoh-pico/protocol/keyexpr.h @@ -34,6 +34,7 @@ _z_keyexpr_t _z_keyexpr_from_string(uint16_t rid, _z_string_t *str); _z_keyexpr_t _z_keyexpr_from_substr(uint16_t rid, const char *str, size_t len); z_result_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src); _z_keyexpr_t _z_keyexpr_duplicate(_z_keyexpr_t src); +_z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src); _z_keyexpr_t _z_keyexpr_alias(_z_keyexpr_t src); /// Returns either keyexpr defined by id + mapping with null suffix if try_declared is true and id is non-zero, /// or keyexpr defined by its suffix only, with 0 id and no mapping. This is to be used only when forwarding diff --git a/include/zenoh-pico/session/liveliness.h b/include/zenoh-pico/session/liveliness.h new file mode 100644 index 000000000..2a5c3296c --- /dev/null +++ b/include/zenoh-pico/session/liveliness.h @@ -0,0 +1,35 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_SESSION_LIVELINESS_H +#define ZENOH_PICO_SESSION_LIVELINESS_H + +#if Z_FEATURE_QUERYABLE == 1 +_z_zint_t _z_liveliness_get_query_id(_z_session_t *zn); + +z_result_t _z_liveliness_register_pending_query(_z_session_t *zn, _z_zint_t id, _z_liveliness_pending_query_t *pen_qry); +void _z_liveliness_unregister_pending_query(_z_session_t *zn, _z_zint_t id); + +z_result_t _z_liveliness_process_token_declare(_z_session_t *zn, const _z_n_msg_declare_t *decl); +z_result_t _z_liveliness_process_token_undeclare(_z_session_t *zn, const _z_n_msg_declare_t *decl); +z_result_t _z_liveliness_process_declare_final(_z_session_t *zn, const _z_n_msg_declare_t *decl); + +z_result_t _z_liveliness_register_token(_z_session_t *zn, _z_zint_t id, const _z_keyexpr_t keyexpr); +void _z_liveliness_unregister_token(_z_session_t *zn, _z_zint_t id); + +void _z_liveliness_init(_z_session_t *zn); +void _z_liveliness_clear(_z_session_t *zn); +#endif + +#endif /* ZENOH_PICO_SESSION_LIVELINESS_H */ diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 1b7ea5dd9..18b2c4508 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -35,8 +35,10 @@ extern "C" { */ typedef void (*_z_drop_handler_t)(void *arg); -#define _Z_RESOURCE_IS_REMOTE 0 -#define _Z_RESOURCE_IS_LOCAL 1 +typedef enum { + _Z_SUBSCRIBER_KIND_SUBSCRIBER = 0, + _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER = 1, +} _z_subscriber_kind_t; typedef struct { _z_keyexpr_t _key; @@ -52,6 +54,9 @@ void _z_resource_free(_z_resource_t **res); _Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy) _Z_LIST_DEFINE(_z_resource, _z_resource_t) +_Z_ELEM_DEFINE(_z_keyexpr, _z_keyexpr_t, _z_noop_size, _z_keyexpr_clear, _z_keyexpr_copy) +_Z_INT_MAP_DEFINE(_z_keyexpr, _z_keyexpr_t) + // Forward declaration to avoid cyclical include typedef struct _z_sample_t _z_sample_t; @@ -138,6 +143,21 @@ void _z_pending_query_clear(_z_pending_query_t *res); _Z_ELEM_DEFINE(_z_pending_query, _z_pending_query_t, _z_noop_size, _z_pending_query_clear, _z_noop_copy) _Z_LIST_DEFINE(_z_pending_query, _z_pending_query_t) +typedef struct { + _z_keyexpr_t _key; + _z_closure_reply_callback_t _callback; + _z_drop_handler_t _dropper; + void *_arg; +} _z_liveliness_pending_query_t; + +void _z_liveliness_pending_query_clear(_z_liveliness_pending_query_t *res); +void _z_liveliness_pending_query_copy(_z_liveliness_pending_query_t *dst, const _z_liveliness_pending_query_t *src); +_z_liveliness_pending_query_t *_z_liveliness_pending_query_clone(const _z_liveliness_pending_query_t *src); + +_Z_ELEM_DEFINE(_z_liveliness_pending_query, _z_liveliness_pending_query_t, _z_noop_size, + _z_liveliness_pending_query_clear, _z_liveliness_pending_query_copy) +_Z_INT_MAP_DEFINE(_z_liveliness_pending_query, _z_liveliness_pending_query_t) + typedef struct { #if Z_FEATURE_MULTI_THREAD == 1 _z_mutex_t _mutex; diff --git a/include/zenoh-pico/session/subscription.h b/include/zenoh-pico/session/subscription.h index 236789f53..59c942554 100644 --- a/include/zenoh-pico/session/subscription.h +++ b/include/zenoh-pico/session/subscription.h @@ -23,19 +23,31 @@ extern "C" { #endif /*------------------ Subscription ------------------*/ -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment, z_reliability_t reliability); +z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, + _z_encoding_t *encoding, const _z_timestamp_t *timestamp, const _z_n_qos_t qos, + const _z_bytes_t attachment, z_reliability_t reliability); + +z_result_t _z_trigger_subscriptions_del(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability); + +z_result_t _z_trigger_liveliness_subscriptions_declare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp); + +z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp); #if Z_FEATURE_SUBSCRIPTION == 1 -_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id); -_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *keyexpr); - -_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *sub); -z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability); -void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub); +_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id); +_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, _z_subscriber_kind_t kind, + const _z_keyexpr_t *keyexpr); + +_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_t *sub); +z_result_t _z_trigger_subscriptions_impl(_z_session_t *zn, _z_subscriber_kind_t subscriber_kind, + const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, + const _z_zint_t sample_kind, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, + z_reliability_t reliability); +void _z_unregister_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_rc_t *sub); void _z_flush_subscriptions(_z_session_t *zn); #endif diff --git a/src/api/api.c b/src/api/api.c index c76a9b32e..4d20a71df 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -828,12 +828,12 @@ z_result_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); - // Trigger local subscriptions - _z_trigger_local_subscriptions( + // Trigger subscriptions + _z_trigger_subscriptions_put( _Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this), - opt.encoding == NULL ? NULL : &opt.encoding->_this._val, + opt.encoding == NULL ? NULL : &opt.encoding->_this._val, opt.timestamp, _z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); // Clean-up z_encoding_drop(opt.encoding); z_bytes_drop(opt.attachment); @@ -961,11 +961,11 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); } - // Trigger local subscriptions - _z_trigger_local_subscriptions( - _Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, + // Trigger subscriptions + _z_trigger_subscriptions_put( + _Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, opt.timestamp, _z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority), - opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); + _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability); _z_session_rc_drop(&sess_rc); } else { @@ -1399,7 +1399,7 @@ z_result_t z_undeclare_subscriber(z_moved_subscriber_t *sub) { const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub) { // Retrieve keyexpr from session uint32_t lookup = sub->_entity_id; - _z_subscription_rc_list_t *tail = _Z_RC_IN_VAL(&sub->_zn)->_local_subscriptions; + _z_subscription_rc_list_t *tail = _Z_RC_IN_VAL(&sub->_zn)->_subscriptions; while (tail != NULL) { _z_subscription_rc_t *head = _z_subscription_rc_list_head(tail); if (_Z_RC_IN_VAL(head)->_id == lookup) { diff --git a/src/api/liveliness.c b/src/api/liveliness.c new file mode 100644 index 000000000..4cd4ec4db --- /dev/null +++ b/src/api/liveliness.c @@ -0,0 +1,146 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/api/liveliness.h" + +#include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/net/primitives.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/resource.h" +#include "zenoh-pico/utils/result.h" + +_Bool _z_liveliness_token_check(const _z_liveliness_token_t *token) { + _z_keyexpr_check(&token->_key); + return true; +} + +_z_liveliness_token_t _z_liveliness_token_null(void) { + _z_liveliness_token_t s = {0}; + s._key = _z_keyexpr_null(); + return s; +} + +void _z_liveliness_token_clear(_z_liveliness_token_t *token) { + _z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&token->_zn); + if (!_Z_RC_IS_NULL(&sess_rc)) { + _z_undeclare_liveliness_token(token); + _z_session_rc_drop(&sess_rc); + } + _z_keyexpr_clear(&token->_key); +} + +_Z_OWNED_FUNCTIONS_VALUE_NO_COPY_IMPL(_z_liveliness_token_t, liveliness_token, _z_liveliness_token_check, + _z_liveliness_token_null, _z_liveliness_token_clear) + +z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_options_t *options) { + options->__dummy = 0; + return _Z_RES_OK; +} + +z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub, + const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback, + z_liveliness_subscriber_options_t *options) { + _ZP_UNUSED(options); + void *ctx = callback->_this._val.context; + callback->_this._val.context = NULL; + + _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); + _z_keyexpr_t key = _z_keyexpr_alias(keyexpr_aliased); + + _z_subscriber_t int_sub = + _z_declare_liveliness_subscriber(zs, key, callback->_this._val.call, callback->_this._val.drop, ctx); + + z_internal_closure_sample_null(&callback->_this); + sub->_val = int_sub; + + if (!_z_subscriber_check(&sub->_val)) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } else { + return _Z_RES_OK; + } +} + +z_result_t z_liveliness_get_options_default(z_liveliness_get_options_t *options) { + options->timeout_ms = Z_GET_TIMEOUT_DEFAULT; + return _Z_RES_OK; +} + +z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr, + z_moved_closure_reply_t *callback, z_liveliness_get_options_t *options) { + z_result_t ret = _Z_RES_OK; + + void *ctx = callback->_this._val.context; + callback->_this._val.context = NULL; + + _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); + _z_keyexpr_t key = keyexpr_aliased; + // TODO(sashacmc): Unicast optimization + /* + // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition + // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic + // resource declarations are only performed on unicast transports. + if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + _z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased); + if (r == NULL) { + uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), keyexpr_aliased); + key = _z_rid_with_suffix(id, NULL); + } + } + */ + z_liveliness_get_options_t opt; + if (options == NULL) { + z_liveliness_get_options_default(&opt); + } else { + opt = *options; + } + + ret = _z_liveliness_query(_Z_RC_IN_VAL(zs), key, callback->_this._val.call, callback->_this._val.drop, ctx, + opt.timeout_ms); + + z_internal_closure_reply_null( + &callback->_this); // call and drop passed to _z_liveliness_query, so we nullify the closure here + return ret; +} + +z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_options_t *options) { + options->__dummy = 0; + return _Z_RES_OK; +} + +z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_liveliness_token_t *token, + const z_loaned_keyexpr_t *keyexpr, + const z_liveliness_declaration_options_t *options) { + (void)options; + + _z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true); + _z_keyexpr_t key = keyexpr_aliased; + + // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition + // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic + // resource declarations are only performed on unicast transports. + if (_Z_RC_IN_VAL(zs)->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + _z_resource_t *r = _z_get_resource_by_key(_Z_RC_IN_VAL(zs), &keyexpr_aliased); + if (r == NULL) { + uint16_t id = _z_declare_resource(_Z_RC_IN_VAL(zs), keyexpr_aliased); + key = _z_rid_with_suffix(id, NULL); + } + } + + return _z_declare_liveliness_token(zs, &token->_val, key); +} + +z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token) { + return _z_undeclare_liveliness_token(&token->_this._val); +} diff --git a/src/collections/fifo_mt.c b/src/collections/fifo_mt.c index e8d735f94..91c918673 100644 --- a/src/collections/fifo_mt.c +++ b/src/collections/fifo_mt.c @@ -17,6 +17,7 @@ #include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/result.h" /*-------- Fifo Buffer Multithreaded --------*/ z_result_t _z_fifo_mt_init(_z_fifo_mt_t *fifo, size_t capacity) { diff --git a/src/collections/intmap.c b/src/collections/intmap.c index 004e61bdd..e7aee7993 100644 --- a/src/collections/intmap.c +++ b/src/collections/intmap.c @@ -144,6 +144,37 @@ void *_z_int_void_map_get(const _z_int_void_map_t *map, size_t k) { return ret; } +_z_int_void_map_iterator_t _z_int_void_map_iterator_make(const _z_int_void_map_t *map) { + _z_int_void_map_iterator_t iter = {0}; + + iter._map = map; + + return iter; +} + +bool _z_int_void_map_iterator_next(_z_int_void_map_iterator_t *iter) { + while (iter->_idx < iter->_map->_capacity) { + if (iter->_list_ptr == NULL) { + iter->_list_ptr = iter->_map->_vals[iter->_idx]; + } else { + iter->_list_ptr = _z_list_tail(iter->_list_ptr); + } + if (iter->_list_ptr == NULL) { + iter->_idx++; + continue; + } + + iter->_entry = iter->_list_ptr->_val; + + return true; + } + return false; +} + +size_t _z_int_void_map_iterator_key(const _z_int_void_map_iterator_t *iter) { return iter->_entry->_key; } + +void *_z_int_void_map_iterator_value(const _z_int_void_map_iterator_t *iter) { return iter->_entry->_val; } + void _z_int_void_map_clear(_z_int_void_map_t *map, z_element_free_f f_f) { if (map->_vals != NULL) { for (size_t idx = 0; idx < map->_capacity; idx++) { diff --git a/src/net/primitives.c b/src/net/primitives.c index 1a67dba9a..f021681b5 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -18,6 +18,7 @@ #include #include "zenoh-pico/api/constants.h" +#include "zenoh-pico/api/liveliness.h" #include "zenoh-pico/collections/slice.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/filtering.h" @@ -25,12 +26,15 @@ #include "zenoh-pico/net/sample.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" +#include "zenoh-pico/protocol/definitions/interest.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/interest.h" +#include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/session.h" #include "zenoh-pico/session/subscription.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" @@ -204,7 +208,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke _z_subscriber_t ret = _z_subscriber_null(); // Register subscription, stored at session-level, do not drop it by the end of this function. - _z_subscription_rc_t *sp_s = _z_register_subscription(_Z_RC_IN_VAL(zn), _Z_RESOURCE_IS_LOCAL, &s); + _z_subscription_rc_t *sp_s = _z_register_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, &s); if (sp_s == NULL) { _z_subscriber_clear(&ret); return ret; @@ -213,7 +217,7 @@ _z_subscriber_t _z_declare_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t ke _z_declaration_t declaration = _z_make_decl_subscriber(&keyexpr, s._id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_RESOURCE_IS_LOCAL, sp_s); + _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, sp_s); _z_subscriber_clear(&ret); return ret; } @@ -229,7 +233,8 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) { return _Z_ERR_ENTITY_UNKNOWN; } // Find subscription entry - _z_subscription_rc_t *s = _z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_RESOURCE_IS_LOCAL, sub->_entity_id); + _z_subscription_rc_t *s = + _z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, sub->_entity_id); if (s == NULL) { return _Z_ERR_ENTITY_UNKNOWN; } @@ -248,7 +253,7 @@ z_result_t _z_undeclare_subscriber(_z_subscriber_t *sub) { _z_n_msg_clear(&n_msg); // Only if message is successfully send, local subscription state can be removed _z_undeclare_resource(_Z_RC_IN_VAL(&sub->_zn), _Z_RC_IN_VAL(s)->_key_id); - _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_RESOURCE_IS_LOCAL, s); + _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_SUBSCRIBER, s); return _Z_RES_OK; } #endif @@ -489,6 +494,141 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete } #endif +#if Z_FEATURE_LIVELINESS == 1 +#if Z_FEATURE_SUBSCRIPTION == 1 +/*------------------ Liveliness Subscriber Declaration ------------------*/ +_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr, + _z_closure_sample_callback_t callback, _z_drop_handler_t dropper, + void *arg) { + _z_subscription_t s; + s._id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + s._key_id = keyexpr._id; + s._key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); + s._callback = callback; + s._dropper = dropper; + s._arg = arg; + + _z_subscriber_t ret = _z_subscriber_null(); + // Register subscription, stored at session-level, do not drop it by the end of this function. + _z_subscription_rc_t *sp_s = + _z_register_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, &s); + if (sp_s == NULL) { + _z_subscriber_clear(&ret); + return ret; + } + // Build the declare message to send on the wire + _z_interest_t interest = + _z_make_interest(&keyexpr, s._id, + _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | _Z_INTEREST_FLAG_RESTRICTED | + _Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE); + + _z_network_message_t n_msg = _z_n_msg_make_interest(interest); + if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sp_s); + _z_subscriber_clear(&ret); + return ret; + } + _z_n_msg_clear(&n_msg); + // Fill subscriber + ret._entity_id = s._id; + ret._zn = _z_session_rc_clone_as_weak(zn); + return ret; +} + +z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) { + if (sub == NULL || _Z_RC_IS_NULL(&sub->_zn)) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + _z_subscription_rc_t *s = + _z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sub->_entity_id); + if (s == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + _z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, s); + return _Z_RES_OK; +} +#endif // Z_FEATURE_SUBSCRIPTION == 1 +#if Z_FEATURE_QUERY == 1 +/*------------------ Query ------------------*/ +z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback, + _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms) { + z_result_t ret = _Z_RES_OK; + + // Create the pending liveliness query object + _z_liveliness_pending_query_t *pq = + (_z_liveliness_pending_query_t *)z_malloc(sizeof(_z_liveliness_pending_query_t)); + if (pq != NULL) { + _z_zint_t id = _z_liveliness_get_query_id(zn); + pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr); + pq->_callback = callback; + pq->_dropper = dropper; + pq->_arg = arg; + + ret = _z_liveliness_register_pending_query(zn, id, pq); + if (ret == _Z_RES_OK) { + _ZP_UNUSED(timeout_ms); // Current interest in pico don't support timeout + + _z_interest_t interest = _z_make_interest(&keyexpr, id, + _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | + _Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT); + + _z_network_message_t n_msg = _z_n_msg_make_interest(interest); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_liveliness_unregister_pending_query(zn, id); + ret = _Z_ERR_TRANSPORT_TX_FAILED; + } + + _z_n_msg_clear(&n_msg); + + } else { + _z_liveliness_pending_query_clear(pq); + } + } + + return ret; +} +#endif // Z_FEATURE_QUERY == 1 +z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token, + _z_keyexpr_t keyexpr) { + z_result_t ret; + + uint32_t id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); + _z_keyexpr_t key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); + + _z_declaration_t declaration = _z_make_decl_token(&key, id); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); + ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + _z_n_msg_clear(&n_msg); + + _z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, key); + + ret_token->_id = id; + // TODO(sashacmc): clenaup? Maybe store in to list only? + _z_keyexpr_move(&ret_token->_key, &key); + ret_token->_zn = _z_session_rc_clone_as_weak(zn); + return ret; +} + +z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token) { + if (token == NULL || _Z_RC_IS_NULL(&token->_zn)) { + return _Z_ERR_ENTITY_UNKNOWN; + } + + z_result_t ret; + + _z_liveliness_unregister_token(_Z_RC_IN_VAL(&token->_zn), token->_id); + + _z_declaration_t declaration = _z_make_undecl_token(token->_id, &token->_key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); + ret = _z_send_n_msg(_Z_RC_IN_VAL(&token->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); + _z_n_msg_clear(&n_msg); + + return ret; +} +#endif // Z_FEATURE_LIVELINESS == 1 + #if Z_FEATURE_INTEREST == 1 /*------------------ Interest Declaration ------------------*/ uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, diff --git a/src/protocol/keyexpr.c b/src/protocol/keyexpr.c index a097b2c40..720ad0539 100644 --- a/src/protocol/keyexpr.c +++ b/src/protocol/keyexpr.c @@ -65,6 +65,14 @@ _z_keyexpr_t _z_keyexpr_duplicate(_z_keyexpr_t src) { return dst; } +_z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src) { + _z_keyexpr_t *dst = z_malloc(sizeof(_z_keyexpr_t)); + if (dst != NULL) { + _z_keyexpr_copy(dst, src); + } + return dst; +} + _z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src) { _z_keyexpr_t stolen = *src; *src = _z_keyexpr_null(); diff --git a/src/session/interest.c b/src/session/interest.c index a16884ee7..7c3687acb 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -21,10 +21,12 @@ #include "zenoh-pico/net/query.h" #include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/declarations.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/session.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" @@ -117,7 +119,7 @@ static z_result_t _z_interest_send_decl_resource(_z_session_t *zn, uint32_t inte #if Z_FEATURE_SUBSCRIPTION == 1 static z_result_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t interest_id) { _zp_session_lock_mutex(zn); - _z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions); + _z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_subscriptions); _zp_session_unlock_mutex(zn); _z_subscription_rc_list_t *xs = sub_list; while (xs != NULL) { @@ -173,6 +175,35 @@ static z_result_t _z_interest_send_decl_queryable(_z_session_t *zn, uint32_t int } #endif +#if Z_FEATURE_LIVELINESS == 1 +static z_result_t _z_interest_send_decl_token(_z_session_t *zn, uint32_t interest_id) { + _zp_session_lock_mutex(zn); + _z_keyexpr_intmap_t token_list = _z_keyexpr_intmap_clone(&zn->_local_tokens); + _zp_session_unlock_mutex(zn); + _z_keyexpr_intmap_iterator_t iter = _z_keyexpr_intmap_iterator_make(&token_list); + while (_z_keyexpr_intmap_iterator_next(&iter)) { + // Build the declare message to send on the wire + size_t id = _z_keyexpr_intmap_iterator_key(&iter); + _z_keyexpr_t key = *_z_keyexpr_intmap_iterator_value(&iter); + _z_declaration_t declaration = _z_make_decl_token(&key, id); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, true, interest_id); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + } + _z_keyexpr_intmap_clear(&token_list); + + return _Z_RES_OK; +} +#else +static z_result_t _z_interest_send_decl_token(_z_session_t *zn, uint32_t interest_id) { + _ZP_UNUSED(zn); + _ZP_UNUSED(interest_id); + return _Z_RES_OK; +} +#endif + static z_result_t _z_interest_send_declare_final(_z_session_t *zn, uint32_t interest_id) { _z_declaration_t decl = _z_make_decl_final(); _z_network_message_t n_msg = _z_n_msg_make_declare(decl, true, interest_id); @@ -265,6 +296,13 @@ z_result_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t decl_type = _Z_DECLARE_TYPE_QUERYABLE; flags = _Z_INTEREST_FLAG_QUERYABLES; break; + case _Z_DECL_TOKEN: + msg.type = _Z_INTEREST_MSG_TYPE_DECL_TOKEN; + msg.id = decl->_body._decl_token._id; + decl_key = &decl->_body._decl_token._keyexpr; + decl_type = _Z_DECLARE_TYPE_TOKEN; + flags = _Z_INTEREST_FLAG_TOKENS; + break; default: return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; } @@ -312,6 +350,12 @@ z_result_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration decl_type = _Z_DECLARE_TYPE_QUERYABLE; flags = _Z_INTEREST_FLAG_QUERYABLES; break; + case _Z_UNDECL_TOKEN: + msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_TOKEN; + msg.id = decl->_body._undecl_token._id; + decl_type = _Z_DECLARE_TYPE_TOKEN; + flags = _Z_INTEREST_FLAG_TOKENS; + break; default: return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; } @@ -402,7 +446,8 @@ z_result_t _z_interest_process_interest(_z_session_t *zn, _z_keyexpr_t key, uint _Z_RETURN_IF_ERR(_z_interest_send_decl_queryable(zn, id)); } if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) { - // Zenoh pico doesn't support liveliness token for now + _Z_DEBUG("Sending declare tokens"); + _Z_RETURN_IF_ERR(_z_interest_send_decl_token(zn, id)); } // Send final declare _Z_RETURN_IF_ERR(_z_interest_send_declare_final(zn, id)); diff --git a/src/session/liveliness.c b/src/session/liveliness.c new file mode 100644 index 000000000..87aecdde6 --- /dev/null +++ b/src/session/liveliness.c @@ -0,0 +1,270 @@ +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/api/liveliness.h" + +#include +#include +#include + +#include "zenoh-pico/collections/bytes.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/net/reply.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/session.h" +#include "zenoh-pico/session/subscription.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/logging.h" +#include "zenoh-pico/utils/result.h" + +#if Z_FEATURE_LIVELINESS == 1 + +void _z_liveliness_pending_query_clear(_z_liveliness_pending_query_t *pen_qry) { + if (pen_qry->_dropper != NULL) { + pen_qry->_dropper(pen_qry->_arg); + } + _z_keyexpr_clear(&pen_qry->_key); +} + +void _z_liveliness_pending_query_copy(_z_liveliness_pending_query_t *dst, const _z_liveliness_pending_query_t *src) { + dst->_arg = src->_arg; + dst->_callback = src->_callback; + dst->_dropper = src->_dropper; + _z_keyexpr_copy(&dst->_key, &src->_key); +} + +_z_liveliness_pending_query_t *_z_liveliness_pending_query_clone(const _z_liveliness_pending_query_t *src) { + _z_liveliness_pending_query_t *dst = z_malloc(sizeof(_z_liveliness_pending_query_t)); + if (dst != NULL) { + _z_liveliness_pending_query_copy(dst, src); + } + return dst; +} + +_z_zint_t _z_liveliness_get_query_id(_z_session_t *zn) { return zn->_liveliness_query_id++; } + +z_result_t _z_liveliness_register_pending_query(_z_session_t *zn, _z_zint_t id, + _z_liveliness_pending_query_t *pen_qry) { + z_result_t ret = _Z_RES_OK; + + _Z_DEBUG(">>> Allocating liveliness query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id, + (int)_z_string_len(&pen_qry->_key._suffix), _z_string_data(&pen_qry->_key._suffix)); + + _zp_session_lock_mutex(zn); + + const _z_liveliness_pending_query_t *pq = + _z_liveliness_pending_query_intmap_get(&zn->_liveliness_pending_queries, id); + if (pq != NULL) { + _Z_ERROR("Duplicate liveliness query id %lu", id); + ret = _Z_ERR_ENTITY_DECLARATION_FAILED; + } else { + _z_liveliness_pending_query_intmap_insert(&zn->_liveliness_pending_queries, id, + _z_liveliness_pending_query_clone(pen_qry)); + } + + _zp_session_unlock_mutex(zn); + + return ret; +} + +z_result_t _z_liveliness_pending_query_reply(_z_session_t *zn, uint32_t interest_id, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp) { + z_result_t ret = _Z_RES_OK; + + _zp_session_lock_mutex(zn); + + const _z_liveliness_pending_query_t *pq = + _z_liveliness_pending_query_intmap_get(&zn->_liveliness_pending_queries, interest_id); + if (pq == NULL) { + ret = _Z_ERR_ENTITY_UNKNOWN; + } + + _Z_DEBUG("Liveliness pending query reply %i resolve result %i", interest_id, ret); + + if (ret == _Z_RES_OK) { + _Z_DEBUG("Resolving %d - %.*s on mapping 0x%x", keyexpr._id, (int)_z_string_len(&keyexpr._suffix), + _z_string_data(&keyexpr._suffix), _z_keyexpr_mapping_id(&keyexpr)); + _z_keyexpr_t expanded_ke = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); + _Z_DEBUG("Reply liveliness query for %d - %.*s", expanded_ke._id, (int)_z_string_len(&expanded_ke._suffix), + _z_string_data(&expanded_ke._suffix)); + + if (_z_keyexpr_suffix_intersects(&pq->_key, &expanded_ke) == false) { + ret = _Z_ERR_QUERY_NOT_MATCH; + } + + if (ret == _Z_RES_OK) { + _z_encoding_t encoding = _z_encoding_null(); + _z_reply_t reply = _z_reply_create(expanded_ke, zn->_local_zid, _z_bytes_null(), timestamp, &encoding, + Z_SAMPLE_KIND_PUT, _z_bytes_null()); + + pq->_callback(&reply, pq->_arg); + _z_reply_clear(&reply); + } + } + + if (ret == _Z_RES_OK) { + _z_liveliness_pending_query_intmap_remove(&zn->_liveliness_pending_queries, interest_id); + } + + _zp_session_unlock_mutex(zn); + + return ret; +} + +z_result_t _z_liveliness_pending_query_drop(_z_session_t *zn, uint32_t interest_id) { + z_result_t ret = _Z_RES_OK; + + _zp_session_lock_mutex(zn); + + const _z_liveliness_pending_query_t *pq = + _z_liveliness_pending_query_intmap_get(&zn->_liveliness_pending_queries, interest_id); + if (pq == NULL) { + ret = _Z_ERR_ENTITY_UNKNOWN; + } + + _Z_DEBUG("Liveliness pending query drop %i resolve result %i", interest_id, ret); + + if (ret == _Z_RES_OK) { + _z_liveliness_pending_query_intmap_remove(&zn->_liveliness_pending_queries, interest_id); + } + + _zp_session_unlock_mutex(zn); + + return ret; +} + +void _z_liveliness_unregister_pending_query(_z_session_t *zn, _z_zint_t id) { + _zp_session_lock_mutex(zn); + + _z_liveliness_pending_query_intmap_remove(&zn->_liveliness_pending_queries, id); + + _zp_session_unlock_mutex(zn); +} + +z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, _z_zint_t id, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp) { + z_result_t ret = _Z_RES_OK; + // TODO(sashacmc): What about history? Currently tokens decalred before subscription started, not processed + ret = _z_trigger_liveliness_subscriptions_declare(zn, keyexpr, timestamp); + if (ret == _Z_RES_OK) { + _zp_session_lock_mutex(zn); + + const _z_keyexpr_t *pkeyexpr = _z_keyexpr_intmap_get(&zn->_remote_tokens, id); + if (pkeyexpr != NULL) { + _Z_ERROR("Duplicate token id %lu", id); + ret = _Z_ERR_ENTITY_DECLARATION_FAILED; + } else { + _z_keyexpr_intmap_insert(&zn->_remote_tokens, id, _z_keyexpr_clone(&keyexpr)); + } + + _zp_session_unlock_mutex(zn); + } + + return ret; +} + +z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, _z_zint_t id, const _z_timestamp_t *timestamp) { + z_result_t ret = _Z_RES_OK; + + _zp_session_lock_mutex(zn); + + const _z_keyexpr_t *pkeyexpr = _z_keyexpr_intmap_get(&zn->_remote_tokens, id); + if (pkeyexpr == NULL) { + ret = _Z_ERR_ENTITY_UNKNOWN; + } + // TODO(sashacmc): review order to avoid keyexpt copy + _z_keyexpr_t keyexpr; + if (ret == _Z_RES_OK) { + _z_keyexpr_copy(&keyexpr, pkeyexpr); + _z_keyexpr_intmap_remove(&zn->_remote_tokens, id); + } + + _zp_session_unlock_mutex(zn); + + if (ret == _Z_RES_OK) { + ret = _z_trigger_liveliness_subscriptions_undeclare(zn, keyexpr, timestamp); + } + return ret; +} + +z_result_t _z_liveliness_register_token(_z_session_t *zn, _z_zint_t id, const _z_keyexpr_t keyexpr) { + z_result_t ret = _Z_RES_OK; + + _zp_session_lock_mutex(zn); + + const _z_keyexpr_t *pkeyexpr = _z_keyexpr_intmap_get(&zn->_local_tokens, id); + if (pkeyexpr != NULL) { + _Z_ERROR("Duplicate token id %lu", id); + ret = _Z_ERR_ENTITY_DECLARATION_FAILED; + } else { + _z_keyexpr_intmap_insert(&zn->_local_tokens, id, _z_keyexpr_clone(&keyexpr)); + } + + _zp_session_unlock_mutex(zn); + + return ret; +} + +void _z_liveliness_unregister_token(_z_session_t *zn, _z_zint_t id) { + _zp_session_lock_mutex(zn); + + _z_keyexpr_intmap_remove(&zn->_local_tokens, id); + + _zp_session_unlock_mutex(zn); +} + +z_result_t _z_liveliness_process_token_declare(_z_session_t *zn, const _z_n_msg_declare_t *decl) { + if (decl->has_interest_id) { + _z_liveliness_pending_query_reply(zn, decl->_interest_id, decl->_decl._body._decl_token._keyexpr, + &decl->_ext_timestamp); + } + + return _z_liveliness_subscription_declare(zn, decl->_decl._body._decl_token._id, + decl->_decl._body._decl_token._keyexpr, &decl->_ext_timestamp); +} + +z_result_t _z_liveliness_process_token_undeclare(_z_session_t *zn, const _z_n_msg_declare_t *decl) { + return _z_liveliness_subscription_undeclare(zn, decl->_decl._body._undecl_token._id, &decl->_ext_timestamp); +} + +z_result_t _z_liveliness_process_declare_final(_z_session_t *zn, const _z_n_msg_declare_t *decl) { + if (decl->has_interest_id) { + _z_liveliness_pending_query_drop(zn, decl->_interest_id); + } + + return _Z_RES_OK; +} + +void _z_liveliness_init(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + + zn->_liveliness_query_id = 1; + zn->_remote_tokens = _z_keyexpr_intmap_make(); + zn->_local_tokens = _z_keyexpr_intmap_make(); + zn->_liveliness_pending_queries = _z_liveliness_pending_query_intmap_make(); + + _zp_session_unlock_mutex(zn); +} + +void _z_liveliness_clear(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + + _z_liveliness_pending_query_intmap_clear(&zn->_liveliness_pending_queries); + _z_keyexpr_intmap_clear(&zn->_local_tokens); + _z_keyexpr_intmap_clear(&zn->_remote_tokens); + + _zp_session_unlock_mutex(zn); +} + +#endif diff --git a/src/session/push.c b/src/session/push.c index 6b45de25b..6f0a6c371 100644 --- a/src/session/push.c +++ b/src/session/push.c @@ -27,17 +27,14 @@ z_result_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push, z_reliabilit // TODO check body to know where to dispatch - size_t kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; if (push->_body._is_put) { _z_msg_put_t *put = &push->_body._body._put; - ret = _z_trigger_subscriptions(zn, push->_key, put->_payload, &put->_encoding, kind, &put->_commons._timestamp, - push->_qos, put->_attachment, reliability); + ret = _z_trigger_subscriptions_put(zn, push->_key, put->_payload, &put->_encoding, &put->_commons._timestamp, + push->_qos, put->_attachment, reliability); } else { - _z_encoding_t encoding = _z_encoding_null(); - _z_bytes_t payload = _z_bytes_null(); _z_msg_del_t *del = &push->_body._body._del; - ret = _z_trigger_subscriptions(zn, push->_key, payload, &encoding, kind, &del->_commons._timestamp, push->_qos, - del->_attachment, reliability); + ret = _z_trigger_subscriptions_del(zn, push->_key, &del->_commons._timestamp, push->_qos, del->_attachment, + reliability); } return ret; } diff --git a/src/session/rx.c b/src/session/rx.c index afacc75d7..df53fe63e 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -25,11 +25,11 @@ #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" #include "zenoh-pico/session/interest.h" +#include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/push.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/reply.h" #include "zenoh-pico/session/resource.h" -#include "zenoh-pico/session/session.h" #include "zenoh-pico/session/subscription.h" #include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" @@ -41,8 +41,8 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * switch (msg->_tag) { case _Z_N_DECLARE: { - _Z_DEBUG("Handling _Z_N_DECLARE"); _z_n_msg_declare_t *decl = &msg->_body._declare; + _Z_DEBUG("Handling _Z_N_DECLARE: %i", decl->_decl._tag); switch (decl->_decl._tag) { case _Z_DECL_KEXPR: { if (_z_register_resource(zn, decl->_decl._body._decl_kexpr._keyexpr, @@ -59,19 +59,28 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * case _Z_DECL_QUERYABLE: { _z_interest_process_declares(zn, &decl->_decl); } break; + case _Z_DECL_TOKEN: { +#if Z_FEATURE_LIVELINESS == 1 + _z_liveliness_process_token_declare(zn, decl); +#endif + _z_interest_process_declares(zn, &decl->_decl); + } break; case _Z_UNDECL_SUBSCRIBER: { _z_interest_process_undeclares(zn, &decl->_decl); } break; case _Z_UNDECL_QUERYABLE: { _z_interest_process_undeclares(zn, &decl->_decl); } break; - case _Z_DECL_TOKEN: { - // TODO: add support or explicitly discard - } break; case _Z_UNDECL_TOKEN: { - // TODO: add support or explicitly discard +#if Z_FEATURE_LIVELINESS == 1 + _z_liveliness_process_token_undeclare(zn, decl); +#endif + _z_interest_process_undeclares(zn, &decl->_decl); } break; case _Z_DECL_FINAL: { +#if Z_FEATURE_LIVELINESS == 1 + _z_liveliness_process_declare_final(zn, decl); +#endif // Check that interest id is valid if (!decl->has_interest_id) { return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; @@ -101,9 +110,9 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * case _Z_REQUEST_PUT: { #if Z_FEATURE_SUBSCRIPTION == 1 _z_msg_put_t put = req->_body._put; - ret = _z_trigger_subscriptions(zn, req->_key, put._payload, &put._encoding, Z_SAMPLE_KIND_PUT, - &put._commons._timestamp, req->_ext_qos, put._attachment, - msg->_reliability); + ret = _z_trigger_subscriptions_put(zn, req->_key, put._payload, &put._encoding, + &put._commons._timestamp, req->_ext_qos, put._attachment, + msg->_reliability); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req->_rid); @@ -113,10 +122,8 @@ z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t * case _Z_REQUEST_DEL: { #if Z_FEATURE_SUBSCRIPTION == 1 _z_msg_del_t del = req->_body._del; - _z_encoding_t encoding = _z_encoding_null(); - ret = _z_trigger_subscriptions(zn, req->_key, _z_bytes_null(), &encoding, Z_SAMPLE_KIND_DELETE, - &del._commons._timestamp, req->_ext_qos, del._attachment, - msg->_reliability); + ret = _z_trigger_subscriptions_del(zn, req->_key, &del._commons._timestamp, req->_ext_qos, + del._attachment, msg->_reliability); #endif if (ret == _Z_RES_OK) { _z_network_message_t final = _z_n_msg_make_response_final(req->_rid); diff --git a/src/session/subscription.c b/src/session/subscription.c index b7bf19f26..07b97d88c 100644 --- a/src/session/subscription.c +++ b/src/session/subscription.c @@ -17,6 +17,7 @@ #include #include +#include "zenoh-pico/api/constants.h" #include "zenoh-pico/api/types.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/sample.h" @@ -78,9 +79,10 @@ _z_subscription_rc_list_t *__z_get_subscriptions_by_key(_z_subscription_rc_list_ * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_subscription_rc_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { +_z_subscription_rc_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, + const _z_zint_t id) { _z_subscription_rc_list_t *subs = - (is_local == _Z_RESOURCE_IS_LOCAL) ? zn->_local_subscriptions : zn->_remote_subscriptions; + (kind == _Z_SUBSCRIBER_KIND_SUBSCRIBER) ? zn->_subscriptions : zn->_liveliness_subscriptions; return __z_get_subscription_by_id(subs, id); } @@ -89,34 +91,35 @@ _z_subscription_rc_t *__unsafe_z_get_subscription_by_id(_z_session_t *zn, uint8_ * Make sure that the following mutexes are locked before calling this function: * - zn->_mutex_inner */ -_z_subscription_rc_list_t *__unsafe_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, +_z_subscription_rc_list_t *__unsafe_z_get_subscriptions_by_key(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_keyexpr_t *key) { _z_subscription_rc_list_t *subs = - (is_local == _Z_RESOURCE_IS_LOCAL) ? zn->_local_subscriptions : zn->_remote_subscriptions; + (kind == _Z_SUBSCRIBER_KIND_SUBSCRIBER) ? zn->_subscriptions : zn->_liveliness_subscriptions; return __z_get_subscriptions_by_key(subs, key); } -_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, uint8_t is_local, const _z_zint_t id) { +_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id) { _zp_session_lock_mutex(zn); - _z_subscription_rc_t *sub = __unsafe_z_get_subscription_by_id(zn, is_local, id); + _z_subscription_rc_t *sub = __unsafe_z_get_subscription_by_id(zn, kind, id); _zp_session_unlock_mutex(zn); return sub; } -_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, uint8_t is_local, const _z_keyexpr_t *key) { +_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, _z_subscriber_kind_t kind, + const _z_keyexpr_t *key) { _zp_session_lock_mutex(zn); - _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, is_local, key); + _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, kind, key); _zp_session_unlock_mutex(zn); return subs; } -_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_t *s) { +_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_t *s) { _Z_DEBUG(">>> Allocating sub decl for (%ju:%.*s)", (uintmax_t)s->_key._id, (int)_z_string_len(&s->_key._suffix), _z_string_data(&s->_key._suffix)); _z_subscription_rc_t *ret = NULL; @@ -126,10 +129,10 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca ret = (_z_subscription_rc_t *)z_malloc(sizeof(_z_subscription_rc_t)); if (ret != NULL) { *ret = _z_subscription_rc_new_from_val(s); - if (is_local == _Z_RESOURCE_IS_LOCAL) { - zn->_local_subscriptions = _z_subscription_rc_list_push(zn->_local_subscriptions, ret); + if (kind == _Z_SUBSCRIBER_KIND_SUBSCRIBER) { + zn->_subscriptions = _z_subscription_rc_list_push(zn->_subscriptions, ret); } else { - zn->_remote_subscriptions = _z_subscription_rc_list_push(zn->_remote_subscriptions, ret); + zn->_liveliness_subscriptions = _z_subscription_rc_list_push(zn->_liveliness_subscriptions, ret); } } @@ -138,17 +141,42 @@ _z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, uint8_t is_loca return ret; } -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment, z_reliability_t reliability) { - z_result_t ret = _z_trigger_subscriptions(zn, keyexpr, payload, encoding, Z_SAMPLE_KIND_PUT, timestamp, qos, - attachment, reliability); - (void)ret; +z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, + _z_encoding_t *encoding, const _z_timestamp_t *timestamp, const _z_n_qos_t qos, + const _z_bytes_t attachment, z_reliability_t reliability) { + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_SUBSCRIBER, keyexpr, payload, encoding, + Z_SAMPLE_KIND_PUT, timestamp, qos, attachment, reliability); } -z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp, - const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability) { +z_result_t _z_trigger_subscriptions_del(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, + z_reliability_t reliability) { + _z_encoding_t encoding = _z_encoding_null(); + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_SUBSCRIBER, keyexpr, _z_bytes_null(), &encoding, + Z_SAMPLE_KIND_DELETE, timestamp, qos, attachment, reliability); +} + +z_result_t _z_trigger_liveliness_subscriptions_declare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp) { + _z_encoding_t encoding = _z_encoding_null(); + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, keyexpr, _z_bytes_null(), + &encoding, Z_SAMPLE_KIND_PUT, timestamp, _Z_N_QOS_DEFAULT, _z_bytes_null(), + Z_RELIABILITY_RELIABLE); +} + +z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, const _z_keyexpr_t keyexpr, + const _z_timestamp_t *timestamp) { + _z_encoding_t encoding = _z_encoding_null(); + return _z_trigger_subscriptions_impl(zn, _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, keyexpr, _z_bytes_null(), + &encoding, Z_SAMPLE_KIND_DELETE, timestamp, _Z_N_QOS_DEFAULT, _z_bytes_null(), + Z_RELIABILITY_RELIABLE); +} + +z_result_t _z_trigger_subscriptions_impl(_z_session_t *zn, _z_subscriber_kind_t subscriber_kind, + const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding, + const _z_zint_t sample_kind, const _z_timestamp_t *timestamp, + const _z_n_qos_t qos, const _z_bytes_t attachment, + z_reliability_t reliability) { z_result_t ret = _Z_RES_OK; _zp_session_lock_mutex(zn); @@ -158,12 +186,13 @@ z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, &keyexpr); _Z_DEBUG("Triggering subs for %d - %.*s", key._id, (int)_z_string_len(&key._suffix), _z_string_data(&key._suffix)); if (_z_keyexpr_has_suffix(&key)) { - _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, _Z_RESOURCE_IS_LOCAL, &key); + _z_subscription_rc_list_t *subs = __unsafe_z_get_subscriptions_by_key(zn, subscriber_kind, &key); _zp_session_unlock_mutex(zn); // Build the sample - _z_sample_t sample = _z_sample_create(&key, payload, timestamp, encoding, kind, qos, attachment, reliability); + _z_sample_t sample = + _z_sample_create(&key, payload, timestamp, encoding, sample_kind, qos, attachment, reliability); // Parse subscription list _z_subscription_rc_list_t *xs = subs; _Z_DEBUG("Triggering %ju subs", (uintmax_t)_z_subscription_rc_list_len(xs)); @@ -183,15 +212,14 @@ z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr return ret; } -void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscription_rc_t *sub) { +void _z_unregister_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_rc_t *sub) { _zp_session_lock_mutex(zn); - if (is_local == _Z_RESOURCE_IS_LOCAL) { - zn->_local_subscriptions = - _z_subscription_rc_list_drop_filter(zn->_local_subscriptions, _z_subscription_rc_eq, sub); + if (kind == _Z_SUBSCRIBER_KIND_SUBSCRIBER) { + zn->_subscriptions = _z_subscription_rc_list_drop_filter(zn->_subscriptions, _z_subscription_rc_eq, sub); } else { - zn->_remote_subscriptions = - _z_subscription_rc_list_drop_filter(zn->_remote_subscriptions, _z_subscription_rc_eq, sub); + zn->_liveliness_subscriptions = + _z_subscription_rc_list_drop_filter(zn->_liveliness_subscriptions, _z_subscription_rc_eq, sub); } _zp_session_unlock_mutex(zn); @@ -200,16 +228,16 @@ void _z_unregister_subscription(_z_session_t *zn, uint8_t is_local, _z_subscript void _z_flush_subscriptions(_z_session_t *zn) { _zp_session_lock_mutex(zn); - _z_subscription_rc_list_free(&zn->_local_subscriptions); - _z_subscription_rc_list_free(&zn->_remote_subscriptions); + _z_subscription_rc_list_free(&zn->_subscriptions); + _z_subscription_rc_list_free(&zn->_liveliness_subscriptions); _zp_session_unlock_mutex(zn); } #else // Z_FEATURE_SUBSCRIPTION == 0 -void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, - _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, - const _z_bytes_t attachment, z_reliability_t reliability) { +void _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload, + _z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp, + const _z_bytes_t attachment, z_reliability_t reliability) { _ZP_UNUSED(zn); _ZP_UNUSED(keyexpr); _ZP_UNUSED(payload); diff --git a/src/session/utils.c b/src/session/utils.c index d82f9d84c..44874a115 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -19,11 +19,11 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/interest.h" +#include "zenoh-pico/session/liveliness.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/subscription.h" -#include "zenoh-pico/utils/logging.h" /*------------------ clone helpers ------------------*/ _z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp) { @@ -60,8 +60,8 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { zn->_local_resources = NULL; zn->_remote_resources = NULL; #if Z_FEATURE_SUBSCRIPTION == 1 - zn->_local_subscriptions = NULL; - zn->_remote_subscriptions = NULL; + zn->_subscriptions = NULL; + zn->_liveliness_subscriptions = NULL; #endif #if Z_FEATURE_QUERYABLE == 1 zn->_local_queryable = NULL; @@ -78,6 +78,10 @@ z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid) { } #endif // Z_FEATURE_MULTI_THREAD == 1 +#if Z_FEATURE_LIVELINESS == 1 + _z_liveliness_init(zn); +#endif + zn->_local_zid = *zid; // Note session in transport switch (zn->_tp._type) { @@ -119,6 +123,9 @@ void _z_session_clear(_z_session_t *zn) { #endif #if Z_FEATURE_QUERY == 1 _z_flush_pending_queries(zn); +#endif +#if Z_FEATURE_LIVELINESS == 1 + _z_liveliness_clear(zn); #endif _z_flush_interest(zn); diff --git a/tests/z_api_liveliness_test.c b/tests/z_api_liveliness_test.c new file mode 100644 index 000000000..623494cc5 --- /dev/null +++ b/tests/z_api_liveliness_test.c @@ -0,0 +1,203 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" +#include "zenoh-pico/api/handlers.h" +#include "zenoh-pico/api/liveliness.h" +#include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/api/types.h" + +#if Z_FEATURE_QUERY == 1 + +#undef NDEBUG +#include +typedef struct context_t { + bool token1_put; + bool token2_put; + bool token1_drop; + bool token2_drop; +} context_t; + +#define assert_ok(x) \ + { \ + int ret = (int)x; \ + if (ret != Z_OK) { \ + printf("%s failed: %d\n", #x, ret); \ + assert(false); \ + } \ + } + +const char* token1_expr = "zenoh-pico/liveliness/test/1"; +const char* token2_expr = "zenoh-pico/liveliness/test/2"; + +void on_receive(z_loaned_sample_t* s, void* context) { + context_t* c = (context_t*)context; + const z_loaned_keyexpr_t* k = z_sample_keyexpr(s); + z_view_string_t ks; + z_keyexpr_as_view_string(k, &ks); + + if (z_sample_kind(s) == Z_SAMPLE_KIND_PUT) { + if (strncmp(token1_expr, z_string_data(z_view_string_loan(&ks)), z_string_len(z_view_string_loan(&ks))) == 0) { + c->token1_put = true; + } else if (strncmp(token2_expr, z_string_data(z_view_string_loan(&ks)), + z_string_len(z_view_string_loan(&ks))) == 0) { + c->token2_put = true; + } + } else if (z_sample_kind(s) == Z_SAMPLE_KIND_DELETE) { + if (strncmp(token1_expr, z_string_data(z_view_string_loan(&ks)), z_string_len(z_view_string_loan(&ks))) == 0) { + c->token1_drop = true; + } else if (strncmp(token2_expr, z_string_data(z_view_string_loan(&ks)), + z_string_len(z_view_string_loan(&ks))) == 0) { + c->token2_drop = true; + } + } +} + +void test_liveliness_sub(void) { + const char* expr = "zenoh-pico/liveliness/test/*"; + + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k, k1, k2; + z_view_keyexpr_from_str(&k, expr); + z_view_keyexpr_from_str(&k1, token1_expr); + z_view_keyexpr_from_str(&k2, token2_expr); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_owned_closure_sample_t closure; + context_t context = {false, false, false, false}; + z_closure_sample(&closure, on_receive, NULL, (void*)(&context)); + + z_owned_subscriber_t sub; + assert_ok(z_liveliness_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k), + z_closure_sample_move(&closure), NULL)); + + z_sleep_s(1); + z_owned_liveliness_token_t t1, t2; + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL)); + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL)); + + z_sleep_s(1); + + assert(context.token1_put); + assert(context.token2_put); + + assert_ok(z_liveliness_undeclare_token(z_liveliness_token_move(&t1))); + z_sleep_s(1); + + assert(context.token1_drop); + assert(!context.token2_drop); + + assert_ok(z_liveliness_undeclare_token(z_liveliness_token_move(&t2))); + z_sleep_s(1); + assert(context.token2_drop); + + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); +} + +void test_liveliness_get(void) { + const char* expr = "zenoh-pico/liveliness/test/*"; + + z_owned_session_t s1, s2; + z_owned_config_t c1, c2; + z_config_default(&c1); + z_config_default(&c2); + z_view_keyexpr_t k, k1; + z_view_keyexpr_from_str(&k, expr); + z_view_keyexpr_from_str(&k1, token1_expr); + + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); + assert_ok(z_open(&s2, z_config_move(&c2), NULL)); + + assert_ok(zp_start_read_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_read_task(z_loan_mut(s2), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); + assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + + z_sleep_s(1); + z_owned_liveliness_token_t t1; + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL)); + z_sleep_s(1); + + z_owned_fifo_handler_reply_t handler; + z_owned_closure_reply_t cb; + assert_ok(z_fifo_channel_reply_new(&cb, &handler, 3)); + + assert_ok(z_liveliness_get(z_session_loan(&s2), z_view_keyexpr_loan(&k), z_closure_reply_move(&cb), NULL)); + z_owned_reply_t reply; + assert_ok(z_fifo_handler_reply_recv(z_fifo_handler_reply_loan(&handler), &reply)); + assert(z_reply_is_ok(z_reply_loan(&reply))); + const z_loaned_keyexpr_t* reply_keyexpr = z_sample_keyexpr(z_reply_ok(z_reply_loan(&reply))); + z_view_string_t reply_keyexpr_s; + z_keyexpr_as_view_string(reply_keyexpr, &reply_keyexpr_s); + assert(strlen(token1_expr) == z_string_len(z_view_string_loan(&reply_keyexpr_s))); + assert(strncmp(token1_expr, z_string_data(z_view_string_loan(&reply_keyexpr_s)), + z_string_len(z_view_string_loan(&reply_keyexpr_s))) == 0); + + z_reply_drop(z_reply_move(&reply)); + assert(z_fifo_handler_reply_recv(z_fifo_handler_reply_loan(&handler), &reply) == Z_CHANNEL_DISCONNECTED); + + z_liveliness_token_drop(z_liveliness_token_move(&t1)); + z_fifo_handler_reply_drop(z_fifo_handler_reply_move(&handler)); + z_sleep_s(1); + assert_ok(z_fifo_channel_reply_new(&cb, &handler, 3)); + + z_liveliness_get(z_session_loan(&s2), z_view_keyexpr_loan(&k), z_closure_reply_move(&cb), NULL); + assert(z_fifo_handler_reply_recv(z_fifo_handler_reply_loan(&handler), &reply) == Z_CHANNEL_DISCONNECTED); + + z_fifo_handler_reply_drop(z_fifo_handler_reply_move(&handler)); + + assert_ok(zp_stop_read_task(z_loan_mut(s1))); + assert_ok(zp_stop_read_task(z_loan_mut(s2))); + assert_ok(zp_stop_lease_task(z_loan_mut(s1))); + assert_ok(zp_stop_lease_task(z_loan_mut(s2))); + + z_session_drop(z_session_move(&s1)); + z_session_drop(z_session_move(&s2)); +} + +int main(int argc, char** argv) { + (void)argc; + (void)argv; + test_liveliness_sub(); + test_liveliness_get(); +} + +#else // Z_FEATURE_QUERY == 1 +int main(int argc, char** argv) { + (void)argc; + (void)argv; +} +#endif // Z_FEATURE_QUERY == 1 diff --git a/tests/z_collections_test.c b/tests/z_collections_test.c index ec103b444..f7118aaa1 100644 --- a/tests/z_collections_test.c +++ b/tests/z_collections_test.c @@ -18,6 +18,7 @@ #include "zenoh-pico/collections/fifo.h" #include "zenoh-pico/collections/lifo.h" +#include "zenoh-pico/collections/list.h" #include "zenoh-pico/collections/ring.h" #include "zenoh-pico/collections/string.h" @@ -308,6 +309,36 @@ void fifo_test_init_free(void) { assert(r == NULL); } +void int_map_iterator_test(void) { + _z_str_intmap_t map; + + map = _z_str_intmap_make(); + _z_str_intmap_insert(&map, 10, "A"); + _z_str_intmap_insert(&map, 20, "B"); + _z_str_intmap_insert(&map, 30, "C"); + _z_str_intmap_insert(&map, 40, "D"); + + _z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map); + + assert(_z_str_intmap_iterator_next(&iter)); + assert(_z_str_intmap_iterator_key(&iter) == 20); + assert(strcmp(_z_str_intmap_iterator_value(&iter), "B") == 0); + + assert(_z_str_intmap_iterator_next(&iter)); + assert(_z_str_intmap_iterator_key(&iter) == 40); + assert(strcmp(_z_str_intmap_iterator_value(&iter), "D") == 0); + + assert(_z_str_intmap_iterator_next(&iter)); + assert(_z_str_intmap_iterator_key(&iter) == 10); + assert(strcmp(_z_str_intmap_iterator_value(&iter), "A") == 0); + + assert(_z_str_intmap_iterator_next(&iter)); + assert(_z_str_intmap_iterator_key(&iter) == 30); + assert(strcmp(_z_str_intmap_iterator_value(&iter), "C") == 0); + + assert(!_z_str_intmap_iterator_next(&iter)); +} + int main(void) { ring_test(); ring_test_init_free(); @@ -315,4 +346,6 @@ int main(void) { lifo_test_init_free(); fifo_test(); fifo_test_init_free(); + + int_map_iterator_test(); } diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index 81f16f50c..59237b3b2 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -842,6 +842,49 @@ void queryable_declaration(void) { _z_wbuf_clear(&wbf); } +/*------------------ Token declaration ------------------*/ +_z_decl_token_t gen_token_declaration(void) { + _z_decl_token_t e_qd = {._keyexpr = gen_keyexpr(), ._id = (uint32_t)gen_uint64()}; + + return e_qd; +} + +void assert_eq_token_declaration(const _z_decl_token_t *left, const _z_decl_token_t *right) { + assert_eq_keyexpr(&left->_keyexpr, &right->_keyexpr); + assert(left->_id == right->_id); +} + +void token_declaration(void) { + printf("\n>> Queryable declaration\n"); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); + + // Initialize + _z_decl_token_t e_qd = gen_token_declaration(); + + // Encode + int8_t res = _z_decl_token_encode(&wbf, &e_qd); + assert(res == _Z_RES_OK); + (void)(res); + + // Decode + _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); + _z_decl_token_t d_qd; + uint8_t e_hdr = 0; + _z_uint8_decode(&e_hdr, &zbf); + res = _z_decl_token_decode(&d_qd, &zbf, e_hdr); + assert(res == _Z_RES_OK); + + printf(" "); + assert_eq_token_declaration(&e_qd, &d_qd); + printf("\n"); + + // Free + _z_keyexpr_clear(&e_qd._keyexpr); + _z_keyexpr_clear(&d_qd._keyexpr); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + /*------------------ Forget Resource declaration ------------------*/ _z_undecl_kexpr_t gen_forget_resource_declaration(void) { _z_undecl_kexpr_t e_frd; @@ -969,12 +1012,52 @@ void forget_queryable_declaration(void) { _z_wbuf_clear(&wbf); } +/*------------------ Forget Token declaration ------------------*/ +_z_undecl_token_t gen_forget_token_declaration(void) { + _z_undecl_token_t e_fqd = {._ext_keyexpr = gen_keyexpr(), ._id = (uint32_t)gen_zint()}; + return e_fqd; +} + +void assert_eq_forget_token_declaration(const _z_undecl_token_t *left, const _z_undecl_token_t *right) { + assert_eq_keyexpr(&left->_ext_keyexpr, &right->_ext_keyexpr); + assert(left->_id == right->_id); +} + +void forget_token_declaration(void) { + printf("\n>> Forget token declaration\n"); + _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); + + // Initialize + _z_undecl_token_t e_fqd = gen_forget_token_declaration(); + + // Encode + int8_t res = _z_undecl_token_encode(&wbf, &e_fqd); + assert(res == _Z_RES_OK); + (void)(res); + + // Decode + _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); + uint8_t e_hdr = 0; + _z_uint8_decode(&e_hdr, &zbf); + _z_undecl_token_t d_fqd = {._ext_keyexpr = _z_keyexpr_null()}; + res = _z_undecl_token_decode(&d_fqd, &zbf, e_hdr); + assert(res == _Z_RES_OK); + + printf(" "); + assert_eq_forget_token_declaration(&e_fqd, &d_fqd); + printf("\n"); + + // Free + _z_keyexpr_clear(&e_fqd._ext_keyexpr); + _z_keyexpr_clear(&d_fqd._ext_keyexpr); + _z_zbuf_clear(&zbf); + _z_wbuf_clear(&wbf); +} + /*------------------ Declaration ------------------*/ _z_declaration_t gen_declaration(void) { - uint8_t decl[] = { - _Z_DECL_KEXPR, _Z_UNDECL_KEXPR, _Z_DECL_SUBSCRIBER, - _Z_UNDECL_SUBSCRIBER, _Z_DECL_QUERYABLE, _Z_UNDECL_QUERYABLE, - }; + uint8_t decl[] = {_Z_DECL_KEXPR, _Z_UNDECL_KEXPR, _Z_DECL_SUBSCRIBER, _Z_UNDECL_SUBSCRIBER, + _Z_DECL_QUERYABLE, _Z_UNDECL_QUERYABLE, _Z_DECL_TOKEN, _Z_UNDECL_TOKEN}; _z_declaration_t d; d._tag = decl[gen_uint8() % (sizeof(decl) / sizeof(uint8_t))]; @@ -998,6 +1081,12 @@ _z_declaration_t gen_declaration(void) { case _Z_UNDECL_QUERYABLE: { d._body._undecl_queryable = gen_forget_queryable_declaration(); } break; + case _Z_DECL_TOKEN: { + d._body._decl_token = gen_token_declaration(); + } break; + case _Z_UNDECL_TOKEN: { + d._body._undecl_token = gen_forget_token_declaration(); + } break; default: assert(false); } @@ -1020,6 +1109,9 @@ void assert_eq_declaration(const _z_declaration_t *left, const _z_declaration_t case _Z_DECL_QUERYABLE: assert_eq_queryable_declaration(&left->_body._decl_queryable, &right->_body._decl_queryable); break; + case _Z_DECL_TOKEN: + assert_eq_token_declaration(&left->_body._decl_token, &right->_body._decl_token); + break; case _Z_UNDECL_KEXPR: assert_eq_forget_resource_declaration(&left->_body._undecl_kexpr, &right->_body._undecl_kexpr); break; @@ -1029,6 +1121,9 @@ void assert_eq_declaration(const _z_declaration_t *left, const _z_declaration_t case _Z_UNDECL_QUERYABLE: assert_eq_forget_queryable_declaration(&left->_body._undecl_queryable, &right->_body._undecl_queryable); break; + case _Z_UNDECL_TOKEN: + assert_eq_forget_token_declaration(&left->_body._undecl_token, &right->_body._undecl_token); + break; default: assert(false); }