Skip to content

Commit

Permalink
Implement liveliness support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 7, 2024
1 parent 0085e43 commit aff1ff4
Show file tree
Hide file tree
Showing 39 changed files with 2,161 additions and 149 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature")
set(Z_FEATURE_SUBSCRIPTION 1 CACHE STRING "Toggle subscription feature")
set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature")
set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature")
set(Z_FEATURE_LIVELINESS 1 CACHE STRING "Toggle liveliness feature")
set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interests")
set(Z_FEATURE_FRAGMENTATION 1 CACHE STRING "Toggle fragmentation")
set(Z_FEATURE_ENCODING_VALUES 1 CACHE STRING "Toggle encoding values")
Expand All @@ -242,6 +243,7 @@ message(STATUS "Building with feature confing:\n\
* SUBSCRIPTION: ${Z_FEATURE_SUBSCRIPTION}\n\
* QUERY: ${Z_FEATURE_QUERY}\n\
* QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\
* LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\
* INTEREST: ${Z_FEATURE_INTEREST}\n\
* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}")

Expand Down Expand Up @@ -536,10 +538,12 @@ if(UNIX OR MSVC)
add_executable(z_client_test ${PROJECT_SOURCE_DIR}/tests/z_client_test.c)
add_executable(z_api_alignment_test ${PROJECT_SOURCE_DIR}/tests/z_api_alignment_test.c)
add_executable(z_session_test ${PROJECT_SOURCE_DIR}/tests/z_session_test.c)
add_executable(z_api_liveliness_test ${PROJECT_SOURCE_DIR}/tests/z_api_liveliness_test.c)

target_link_libraries(z_client_test zenohpico::lib)
target_link_libraries(z_api_alignment_test zenohpico::lib)
target_link_libraries(z_session_test zenohpico::lib)
target_link_libraries(z_api_liveliness_test zenohpico::lib)

configure_file(${PROJECT_SOURCE_DIR}/tests/routed.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed.sh COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/api.sh ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh COPYONLY)
Expand All @@ -548,6 +552,7 @@ if(UNIX OR MSVC)
add_test(z_client_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/routed.sh z_client_test)
add_test(z_api_alignment_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_alignment_test)
add_test(z_session_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_session_test)
add_test(z_api_liveliness_test bash ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/api.sh z_api_liveliness_test)
endif()
endif()
endif()
Expand Down
26 changes: 26 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,32 @@ Functions
.. autocfunction:: serialization.h::ze_serialize_substr
Liveliness
========================
Types
-----
.. autoctype:: liveliness.h::z_liveliness_declaration_options_t
.. autoctype:: liveliness.h::z_liveliness_subscriber_options_t
.. autoctype:: liveliness.h::z_liveliness_get_options_t
Represents a Liveliness token entity.
See details at :ref:`owned_types_concept`
.. c:type:: z_owned_liveliness_token_t
.. c:type:: z_loaned_liveliness_token_t
.. c:type:: z_moved_liveliness_token_t
Functions
---------
.. autocfunction:: liveliness.h::z_liveliness_declaration_options_default
.. autocfunction:: liveliness.h::z_liveliness_declare_token
.. autocfunction:: liveliness.h::z_liveliness_undeclare_token
.. autocfunction:: liveliness.h::z_liveliness_subscriber_options_default
.. autocfunction:: liveliness.h::z_liveliness_declare_subscriber
.. autocfunction:: liveliness.h::z_liveliness_get
Others
======
Expand Down
3 changes: 2 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"-DZ_FEATURE_SUBSCRIPTION=1",
"-DZ_FEATURE_QUERY=1",
"-DZ_FEATURE_QUERYABLE=1",
"-DZ_FEATURE_ENCODING_VALUES=1"
"-DZ_FEATURE_ENCODING_VALUES=1",
"-DZ_FEATURE_LIVELINESS=1",
]

# -- Options for HTML output -------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ if(UNIX)
add_example(z_sub_channel unix/c11/z_sub_channel.c)
add_example(z_sub_st unix/c11/z_sub_st.c)
add_example(z_sub_attachment unix/c11/z_sub_attachment.c)
add_example(z_sub_liveliness unix/c11/z_sub_liveliness.c)
add_example(z_pull unix/c11/z_pull.c)
add_example(z_get unix/c11/z_get.c)
add_example(z_get_channel unix/c11/z_get_channel.c)
add_example(z_get_attachment unix/c11/z_get_attachment.c)
add_example(z_get_liveliness unix/c11/z_get_liveliness.c)
add_example(z_queryable unix/c11/z_queryable.c)
add_example(z_queryable_channel unix/c11/z_queryable_channel.c)
add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c)
Expand All @@ -55,6 +57,7 @@ if(UNIX)
add_example(z_pub_thr unix/c11/z_pub_thr.c)
add_example(z_sub_thr unix/c11/z_sub_thr.c)
add_example(z_bytes unix/c11/z_bytes.c)
add_example(z_liveliness unix/c11/z_liveliness.c)
endif()
elseif(MSVC)
add_example(z_put windows/z_put.c)
Expand Down
116 changes: 116 additions & 0 deletions examples/unix/c11/z_get_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>

#include <stddef.h>
#include <stdio.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_LIVELINESS == 1 && Z_FEATURE_QUERY == 1

int main(int argc, char **argv) {
const char *keyexpr = "group1/**";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

z_view_keyexpr_t ke;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

printf("Sending liveliness query '%s'...\n", keyexpr);
z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, 16);
if (z_liveliness_get(z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) {
printf("Liveliness query failed");
return -1;
}
z_owned_reply_t reply;
for (z_result_t res = z_recv(z_loan(handler), &reply); res == Z_OK; res = z_recv(z_loan(handler), &reply)) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));
z_view_string_t key_str;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str);
printf(">> Alive token ('%.*s')\n", (int)z_string_len(z_loan(key_str)), z_string_data(z_loan(key_str)));
} else {
printf("Received an error\n");
}
}

z_drop(z_move(reply));
z_drop(z_move(handler));
z_drop(z_move(s));
return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_LIVELINESS but this example requires "
"them.\n");
return -2;
}
#endif
121 changes: 121 additions & 0 deletions examples/unix/c11/z_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>

#include <signal.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_LIVELINESS == 1

static volatile int keepRunning = 1;

void intHandler(int dummy) {
(void)dummy;
keepRunning = 0;
}

int main(int argc, char **argv) {
const char *keyexpr = "group1/zenoh-pico";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

z_view_keyexpr_t ke;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

printf("Declaring liveliness token '%s'...\n", keyexpr);
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) {
printf("Unable to create liveliness token!\n");
exit(-1);
}

printf("Press CTRL-C to undeclare liveliness token and quit...\n");
signal(SIGINT, intHandler);
while (keepRunning) {
z_sleep_s(1);
}

// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
printf("Undeclaring liveliness token...\n");
z_drop(z_move(token));

z_drop(z_move(s));
return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires "
"them.\n");
return -2;
}
#endif
Loading

0 comments on commit aff1ff4

Please sign in to comment.