Skip to content

Commit

Permalink
Implement liveliness token support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 4, 2024
1 parent 35f60d6 commit 6c5a0a2
Show file tree
Hide file tree
Showing 34 changed files with 1,831 additions and 138 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature")
set(Z_FEATURE_SUBSCRIPTION 1 CACHE STRING "Toggle subscription feature")
set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature")
set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature")
set(Z_FEATURE_LIVELINESS 1 CACHE STRING "Toggle liveliness feature")
set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interests")
set(Z_FEATURE_FRAGMENTATION 1 CACHE STRING "Toggle fragmentation")
set(Z_FEATURE_ENCODING_VALUES 1 CACHE STRING "Toggle encoding values")
Expand All @@ -242,6 +243,7 @@ message(STATUS "Building with feature confing:\n\
* SUBSCRIPTION: ${Z_FEATURE_SUBSCRIPTION}\n\
* QUERY: ${Z_FEATURE_QUERY}\n\
* QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\
* LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\
* INTEREST: ${Z_FEATURE_INTEREST}\n\
* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}")

Expand Down Expand Up @@ -471,6 +473,7 @@ if(UNIX OR MSVC)
add_executable(z_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_bytes_test.c)
add_executable(z_api_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_api_bytes_test.c)
add_executable(z_api_encoding_test ${PROJECT_SOURCE_DIR}/tests/z_api_encoding_test.c)
add_executable(z_api_liveliness_test ${PROJECT_SOURCE_DIR}/tests/z_api_liveliness_test.c)
add_executable(z_refcount_test ${PROJECT_SOURCE_DIR}/tests/z_refcount_test.c)

target_link_libraries(z_data_struct_test zenohpico::lib)
Expand All @@ -489,6 +492,7 @@ if(UNIX OR MSVC)
target_link_libraries(z_bytes_test zenohpico::lib)
target_link_libraries(z_api_bytes_test zenohpico::lib)
target_link_libraries(z_api_encoding_test zenohpico::lib)
target_link_libraries(z_api_liveliness_test zenohpico::lib)
target_link_libraries(z_refcount_test zenohpico::lib)

configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY)
Expand All @@ -512,6 +516,7 @@ if(UNIX OR MSVC)
add_test(z_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_bytes_test)
add_test(z_api_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_bytes_test)
add_test(z_api_encoding_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_encoding_test)
add_test(z_api_liveliness_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_liveliness_test)
add_test(z_refcount_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_refcount_test)
endif()

Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ if(UNIX)
add_example(z_sub_channel unix/c11/z_sub_channel.c)
add_example(z_sub_st unix/c11/z_sub_st.c)
add_example(z_sub_attachment unix/c11/z_sub_attachment.c)
add_example(z_sub_liveliness unix/c11/z_sub_liveliness.c)
add_example(z_pull unix/c11/z_pull.c)
add_example(z_get unix/c11/z_get.c)
add_example(z_get_channel unix/c11/z_get_channel.c)
add_example(z_get_attachment unix/c11/z_get_attachment.c)
add_example(z_get_liveliness unix/c11/z_get_liveliness.c)
add_example(z_queryable unix/c11/z_queryable.c)
add_example(z_queryable_channel unix/c11/z_queryable_channel.c)
add_example(z_queryable_attachment unix/c11/z_queryable_attachment.c)
Expand All @@ -55,6 +57,7 @@ if(UNIX)
add_example(z_pub_thr unix/c11/z_pub_thr.c)
add_example(z_sub_thr unix/c11/z_sub_thr.c)
add_example(z_bytes unix/c11/z_bytes.c)
add_example(z_liveliness unix/c11/z_liveliness.c)
endif()
elseif(MSVC)
add_example(z_put windows/z_put.c)
Expand Down
117 changes: 117 additions & 0 deletions examples/unix/c11/z_get_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>

#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_LIVELINESS == 1

int main(int argc, char **argv) {
const char *keyexpr = "group1/**";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

z_view_keyexpr_t ke;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

printf("Sending liveliness query '%s'...\n", keyexpr);
z_owned_fifo_handler_reply_t handler;
z_owned_closure_reply_t closure;
z_fifo_channel_reply_new(&closure, &handler, 16);
if (z_liveliness_get(z_loan(s), z_loan(ke), z_move(closure), NULL) < 0) {
printf("Liveliness query failed");
return -1;
}
z_owned_reply_t reply;
for (z_result_t res = z_recv(z_loan(handler), &reply); res == Z_OK; res = z_recv(z_loan(handler), &reply)) {
if (z_reply_is_ok(z_loan(reply))) {
const z_loaned_sample_t *sample = z_reply_ok(z_loan(reply));
z_view_string_t key_str;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_str);
printf(">> Alive token ('%.*s')\n", (int)z_string_len(z_loan(key_str)), z_string_data(z_loan(key_str)));
} else {
printf("Received an error\n");
}
}

z_drop(z_move(reply));
z_drop(z_move(handler));
z_drop(z_move(s));
return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires "
"them.\n");
return -2;
}
#endif
112 changes: 112 additions & 0 deletions examples/unix/c11/z_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>

#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_LIVELINESS == 1

int main(int argc, char **argv) {
const char *keyexpr = "group1/zenoh-pico";
const char *mode = "client";
const char *clocator = NULL;
const char *llocator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:l:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

z_view_keyexpr_t ke;
if (z_view_keyexpr_from_str(&ke, keyexpr) < 0) {
printf("%s is not a valid key expression", keyexpr);
return -1;
}

printf("Declaring liveliness token '%s'...\n", keyexpr);
z_owned_liveliness_token_t token;
if (z_liveliness_declare_token(z_loan(s), &token, z_loan(ke), NULL) < 0) {
printf("Unable to create liveliness token!\n");
exit(-1);
}

printf("Press CTRL-C to undeclare liveliness token and quit...\n");
while (1) {
z_sleep_s(1);
}

// LivelinessTokens are automatically closed when dropped
// Use the code below to manually undeclare it if needed
printf("Undeclaring liveliness token...\n");
z_drop(z_move(token));

z_drop(z_move(s));
return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY or Z_FEATURE_MULTI_THREAD but this example requires "
"them.\n");
return -2;
}
#endif
Loading

0 comments on commit 6c5a0a2

Please sign in to comment.