From e23e6a58f5f6636b5df3be67a9428d3464436e3d Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Wed, 13 Dec 2023 17:21:15 +0100 Subject: [PATCH 01/21] fix: revert wrap bytes change --- src/protocol/codec.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/protocol/codec.c b/src/protocol/codec.c index 7b4be95f4..225253418 100644 --- a/src/protocol/codec.c +++ b/src/protocol/codec.c @@ -296,8 +296,7 @@ int8_t _z_bytes_val_encode(_z_wbuf_t *wbf, const _z_bytes_t *bs) { int8_t ret = _Z_RES_OK; if ((wbf->_expansion_step != 0) && (bs->len > Z_TSID_LENGTH)) { - // ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len); - ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len); + ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len); } else { ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len); } From 34361ee14b0a4613fd27831935a6ca7742f6e2a2 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 11:25:01 +0100 Subject: [PATCH 02/21] fix: switch to fixed buffer value --- include/zenoh-pico/protocol/definitions/message.h | 2 ++ src/transport/multicast/tx.c | 2 +- src/transport/raweth/tx.c | 2 +- src/transport/unicast/tx.c | 2 +- 4 files changed, 5 insertions(+), 3 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index 703ed6172..cf9326103 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -45,6 +45,8 @@ #define _Z_FLAG_Z_T 0x20 // 1 << 5 | QueryTarget if T==1 then the query target is present #define _Z_FLAG_Z_X 0x00 // Unused flags are set to zero +#define _Z_FRAG_BUFF_BASE_SIZE 128 // Base size of the buffer to encode a fragment message header + // Flags: // - T: Timestamp If T==1 then the timestamp if present // - E: Encoding If E==1 then the encoding is present diff --git a/src/transport/multicast/tx.c b/src/transport/multicast/tx.c index 4b9d034b5..4b2d77c6c 100644 --- a/src/transport/multicast/tx.c +++ b/src/transport/multicast/tx.c @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); + _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 2f9ca8ed5..c9984bc48 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -271,7 +271,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, ztm->_transmitted = true; } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); + _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm)); // Fragment and send the message diff --git a/src/transport/unicast/tx.c b/src/transport/unicast/tx.c index 25caabee6..7fce59560 100644 --- a/src/transport/unicast/tx.c +++ b/src/transport/unicast/tx.c @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg } else { // The message does not fit in the current batch, let's fragment it // Create an expandable wbuf for fragmentation - _z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true); + _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf if (ret == _Z_RES_OK) { From 6ab2281ec3c44cb2d48ecb55153e398a2d6308c8 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 11:38:05 +0100 Subject: [PATCH 03/21] feat: expose batch unicast size in cmake --- CMakeLists.txt | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fc9605b9..a611a6f26 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,7 @@ option(WITH_ZEPHYR "Build for Zephyr RTOS" OFF) option(WITH_FREERTOS_PLUS_TCP "Build for FreeRTOS RTOS and FreeRTOS-Plus-TCP network stack" OFF) set(ZENOH_DEBUG 0 CACHE STRING "Use this to set the ZENOH_DEBUG variable") set(FRAG_MAX_SIZE 0 CACHE STRING "Use this to override the maximum size for fragmented messages") +set(BATCH_UNICAST_SIZE 0 CACHE STRING "Use this to override the maximum unicast batch size") set(CMAKE_EXPORT_COMPILE_COMMANDS ON CACHE INTERNAL "") if(CMAKE_EXPORT_COMPILE_COMMANDS) set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES @@ -106,6 +107,9 @@ add_definition(ZENOH_DEBUG=${ZENOH_DEBUG}) if(FRAG_MAX_SIZE) add_definition(Z_FRAG_MAX_SIZE=${FRAG_MAX_SIZE}) endif() +if (BATCH_UNICAST_SIZE) + add_definition(Z_BATCH_UNICAST_SIZE=${BATCH_UNICAST_SIZE}) +endif() # Zenoh pico feature configuration options set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature") @@ -132,6 +136,9 @@ message(STATUS "Zenoh Level Log: ${ZENOH_DEBUG}") if(FRAG_MAX_SIZE) message(STATUS "Fragmented message max size: ${FRAG_MAX_SIZE}") endif() +if(BATCH_UNICAST_SIZE) + message(STATUS "Unicast batch max size: ${BATCH_UNICAST_SIZE}") +endif() message(STATUS "Build for Zephyr RTOS: ${WITH_ZEPHYR}") message(STATUS "Build for FreeRTOS-Plus-TCP: ${WITH_FREERTOS_PLUS_TCP}") message(STATUS "Configuring for ${CMAKE_SYSTEM_NAME}") From c8a08de37ad41af3b8cc56d0ba685c7f85ca2c8f Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 14:59:22 +0100 Subject: [PATCH 04/21] feat: add fragment test --- CMakeLists.txt | 5 ++ tests/fragment.py | 128 +++++++++++++++++++++++++++++++++++++ tests/z_test_fragment_rx.c | 92 ++++++++++++++++++++++++++ tests/z_test_fragment_tx.c | 79 +++++++++++++++++++++++ 4 files changed, 304 insertions(+) create mode 100644 tests/fragment.py create mode 100644 tests/z_test_fragment_rx.c create mode 100644 tests/z_test_fragment_tx.c diff --git a/CMakeLists.txt b/CMakeLists.txt index a611a6f26..4728c1f5b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -308,6 +308,8 @@ if(UNIX OR MSVC) add_executable(z_keyexpr_test ${PROJECT_SOURCE_DIR}/tests/z_keyexpr_test.c) add_executable(z_api_null_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_null_drop_test.c) add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c) + add_executable(z_test_fragment_tx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_tx.c) + add_executable(z_test_fragment_rx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_rx.c) target_link_libraries(z_data_struct_test ${Libname}) target_link_libraries(z_endpoint_test ${Libname}) @@ -316,9 +318,12 @@ if(UNIX OR MSVC) target_link_libraries(z_keyexpr_test ${Libname}) target_link_libraries(z_api_null_drop_test ${Libname}) target_link_libraries(z_api_double_drop_test ${Libname}) + target_link_libraries(z_test_fragment_tx ${Libname}) + target_link_libraries(z_test_fragment_rx ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY) + configure_file(${PROJECT_SOURCE_DIR}/tests/fragment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/fragment.py COPYONLY) enable_testing() add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test) diff --git a/tests/fragment.py b/tests/fragment.py new file mode 100644 index 000000000..bbe4d8019 --- /dev/null +++ b/tests/fragment.py @@ -0,0 +1,128 @@ +import subprocess +import sys +import time + +# Specify the directory for the binaries +DIR_TESTS = "build/tests" + +def check_output(tx_status, tx_output, rx_status, rx_output): + test_status = 0 + + # Expected tx output & status + z_tx_expected_status = 0 + z_tx_expected_output = "[tx]: Sending packet on test/zenoh-pico-fragment, len: 100000" + # Expected rx output & status + z_rx_expected_status = 0 + z_rx_expected_output = ( + "[rx]: Received packet on test/zenoh-pico-fragment, len: 100000, validity: 1") + + # Check the exit status of tx + if tx_status == z_tx_expected_status: + print("z_tx status valid") + else: + print(f"z_tx status invalid, expected: {z_tx_expected_status}, received: {tx_status}") + test_status = 1 + + # Check output of tx + if z_tx_expected_output in tx_output: + print("z_tx output valid") + else: + print("z_tx output invalid:") + print(f"Expected: \"{z_tx_expected_output}\"") + print(f"Received: \"{tx_output}\"") + test_status = 1 + + # Check the exit status of z_rx + if rx_status == z_rx_expected_status: + print("z_rx status valid") + else: + print(f"z_rx status invalid, expected: {z_rx_expected_status}, received: {rx_status}") + test_status = 1 + + # Check output of z_rx + if z_rx_expected_output in rx_output: + print("z_rx output valid") + else: + print("z_rx output invalid:") + print(f"Expected: \"{z_rx_expected_output}\"") + print(f"Received: \"{rx_output}\"") + test_status = 1 + # Return value + return test_status + +def test_client(): + # Start rx in the background + print("Start rx client") + z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx" + z_rx_process = subprocess.Popen(z_rx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True) + # Introduce a delay to ensure rx starts + time.sleep(3) + # Start tx + print("Start tx client") + z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx" + z_tx_process = subprocess.Popen(z_tx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + # Wait for tx to finish + z_tx_process.wait() + print("Stop rx") + if z_rx_process.poll() is None: + # Send "q" command to rx to stop it + z_rx_process.stdin.write("q\n") + z_rx_process.stdin.flush() + # Wait for rx to finish + z_rx_process.wait() + # Check output + return check_output(z_tx_process.returncode, z_tx_process.stdout.read(), + z_rx_process.returncode, z_rx_process.stdout.read()) + +def test_peer(): + # Start rx in the background + print("Start rx peer") + z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx 1" + z_rx_process = subprocess.Popen(z_rx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True) + # Introduce a delay to ensure rx starts + time.sleep(2) + # Start tx + print("Start tx peer") + z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx 1" + z_tx_process = subprocess.Popen(z_tx_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + # Wait for tx to finish + z_tx_process.wait() + print("Stop rx") + if z_rx_process.poll() is None: + # Send "q" command to rx to stop it + z_rx_process.stdin.write("q\n") + z_rx_process.stdin.flush() + # Wait for rx to finish + z_rx_process.wait() + # Check output + return check_output(z_tx_process.returncode, z_tx_process.stdout.read(), + z_rx_process.returncode, z_rx_process.stdout.read()) + +if __name__ == "__main__": + EXIT_STATUS = 0 + + # Run tests + if test_client() == 1: + EXIT_STATUS = 1 + if test_peer() == 1: + EXIT_STATUS = 1 + # Exit + sys.exit(EXIT_STATUS) diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c new file mode 100644 index 000000000..005d32c5e --- /dev/null +++ b/tests/z_test_fragment_rx.c @@ -0,0 +1,92 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_SUBSCRIPTION == 1 +void data_handler(const z_sample_t *sample, void *ctx) { + (void)(ctx); + z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); + bool is_valid = true; + const uint8_t *data = sample->payload.start; + for (size_t i = 0; i < sample->payload.len; i++) { + if (data[i] != (uint8_t)i) { + is_valid = false; + break; + } + } + printf("[rx]: Received packet on %s, len: %d, validity: %d\n", z_loan(keystr), (int)sample->payload.len, is_valid); + z_drop(z_move(keystr)); +} + +int main(int argc, char **argv) { + const char *keyexpr = "test/**"; + const char *mode = "client"; + char *llocator = NULL; + (void)argv; + + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Declare subscriber + z_owned_closure_sample_t callback = z_closure(data_handler); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } + // Wait for termination + char c = '\0'; + while (c != 'q') { + fflush(stdin); + int ret = scanf("%c", &c); + (void)ret; // Remove unused result warning + } + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this test requires it.\n"); + return -2; +} +#endif diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c new file mode 100644 index 000000000..5b2a99760 --- /dev/null +++ b/tests/z_test_fragment_tx.c @@ -0,0 +1,79 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_PUBLICATION == 1 +int main(int argc, char **argv) { + const char *keyexpr = "test/zenoh-pico-fragment"; + const char *mode = "client"; + char *llocator = NULL; + uint8_t *value = NULL; + size_t size = 100000; + (void)argv; + + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; + } + // Init value + value = malloc(size); + if (value == NULL) { + return -1; + } + for (size_t i = 0; i < size; i++) { + value[i] = (uint8_t)i; + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + return -1; + } + // Put data + printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); + z_put_options_t options = z_put_options_default(); + options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); + if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) { + printf("Oh no! Put has failed...\n"); + } + // Clean up + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this test requires it.\n"); + return -2; +} +#endif From 092e60da01b63e87e991d480e5f6195e551d8603 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 14:59:38 +0100 Subject: [PATCH 05/21] ci: add fragment test --- .github/workflows/build-check.yaml | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index b5b96943c..0ccb69a32 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -86,7 +86,7 @@ jobs: - name: Checkout code uses: actions/checkout@v4 - - name: Build project + - name: Build project and run test run: | sudo apt install -y ninja-build CMAKE_GENERATOR=Ninja make @@ -94,4 +94,26 @@ jobs: timeout-minutes: 5 env: Z_FEATURE_RAWETH_TRANSPORT: ${{ matrix.feature_reth }} - \ No newline at end of file + + fragment_test: + name: Test multicast and unicast fragmentation + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Run docker image + run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:master + + - name: Build project and run test + run: | + sudo apt install -y ninja-build + CMAKE_GENERATOR=Ninja make + python3 ./build/tests/fragment.py + timeout-minutes: 5 + + - name: Stop docker image + if: always() + run: | + docker stop zenoh_router + docker rm zenoh_router \ No newline at end of file From 2503af1ecdf51910e175d9ba360f4db08e53d9ca Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 15:08:01 +0100 Subject: [PATCH 06/21] fix: remove unused include --- tests/z_test_fragment_rx.c | 2 -- tests/z_test_fragment_tx.c | 2 -- 2 files changed, 4 deletions(-) diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c index 005d32c5e..f194644d3 100644 --- a/tests/z_test_fragment_rx.c +++ b/tests/z_test_fragment_rx.c @@ -12,11 +12,9 @@ // ZettaScale Zenoh Team, // -#include #include #include #include -#include #include #if Z_FEATURE_SUBSCRIPTION == 1 diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index 5b2a99760..3534ae1b9 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -12,11 +12,9 @@ // ZettaScale Zenoh Team, // -#include #include #include #include -#include #include #if Z_FEATURE_PUBLICATION == 1 From 87bfe0f24866c8199573d38bf9f6ff5ed7a26733 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 15:08:40 +0100 Subject: [PATCH 07/21] fix: add tempo for rx --- tests/fragment.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/fragment.py b/tests/fragment.py index bbe4d8019..d92588f6d 100644 --- a/tests/fragment.py +++ b/tests/fragment.py @@ -60,7 +60,7 @@ def test_client(): stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) # Introduce a delay to ensure rx starts - time.sleep(3) + time.sleep(2) # Start tx print("Start tx client") z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx" @@ -72,6 +72,8 @@ def test_client(): text=True) # Wait for tx to finish z_tx_process.wait() + # Wait for rx to receive + time.sleep(1) print("Stop rx") if z_rx_process.poll() is None: # Send "q" command to rx to stop it @@ -105,6 +107,8 @@ def test_peer(): text=True) # Wait for tx to finish z_tx_process.wait() + # Wait for rx to receive + time.sleep(1) print("Stop rx") if z_rx_process.poll() is None: # Send "q" command to rx to stop it From 84f3a96c90302670c6c7b10e3bc887c6d1b62cbe Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 15:30:24 +0100 Subject: [PATCH 08/21] feat: add multiple tries in test --- tests/z_test_fragment_tx.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index 3534ae1b9..681666246 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -57,11 +57,13 @@ int main(int argc, char **argv) { return -1; } // Put data - printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); z_put_options_t options = z_put_options_default(); options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL); - if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) { - printf("Oh no! Put has failed...\n"); + for (int i = 0; i < 5; i++) { + printf("[tx]: Sending packet on %s, len: %d\n", keyexpr, (int)size); + if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, size, &options) < 0) { + printf("Oh no! Put has failed...\n"); + } } // Clean up zp_stop_read_task(z_loan(s)); From 33bdef52f4d75112f74f25ad67b639b2e65dd6fb Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 15:36:46 +0100 Subject: [PATCH 09/21] feat: limit tcp batch size --- .github/workflows/build-check.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index 0ccb69a32..1d0d07d9e 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -108,7 +108,8 @@ jobs: - name: Build project and run test run: | sudo apt install -y ninja-build - CMAKE_GENERATOR=Ninja make + cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja + ninja -C build/ python3 ./build/tests/fragment.py timeout-minutes: 5 From b5a063ae1656c6f22f7e2d4f48a57794dd7da430 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 15:54:31 +0100 Subject: [PATCH 10/21] fix: reduce message size --- .github/workflows/build-check.yaml | 4 ++-- tests/fragment.py | 4 ++-- tests/z_test_fragment_tx.c | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index 1d0d07d9e..d18f5cb2e 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -108,8 +108,8 @@ jobs: - name: Build project and run test run: | sudo apt install -y ninja-build - cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja - ninja -C build/ + # cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja + CMAKE_GENERATOR=Ninja make python3 ./build/tests/fragment.py timeout-minutes: 5 diff --git a/tests/fragment.py b/tests/fragment.py index d92588f6d..d29295f0f 100644 --- a/tests/fragment.py +++ b/tests/fragment.py @@ -10,11 +10,11 @@ def check_output(tx_status, tx_output, rx_status, rx_output): # Expected tx output & status z_tx_expected_status = 0 - z_tx_expected_output = "[tx]: Sending packet on test/zenoh-pico-fragment, len: 100000" + z_tx_expected_output = "[tx]: Sending packet on test/zenoh-pico-fragment, len: 10000" # Expected rx output & status z_rx_expected_status = 0 z_rx_expected_output = ( - "[rx]: Received packet on test/zenoh-pico-fragment, len: 100000, validity: 1") + "[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1") # Check the exit status of tx if tx_status == z_tx_expected_status: diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index 681666246..492930d03 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -23,7 +23,7 @@ int main(int argc, char **argv) { const char *mode = "client"; char *llocator = NULL; uint8_t *value = NULL; - size_t size = 100000; + size_t size = 10000; (void)argv; // Check if peer mode From 24ba9cccdf1f1b9dc14182c0b76fb939c9c12e7a Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 16:46:47 +0100 Subject: [PATCH 11/21] fix: free buffer --- tests/z_test_fragment_tx.c | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/z_test_fragment_tx.c b/tests/z_test_fragment_tx.c index 492930d03..bec1c673d 100644 --- a/tests/z_test_fragment_tx.c +++ b/tests/z_test_fragment_tx.c @@ -69,6 +69,7 @@ int main(int argc, char **argv) { zp_stop_read_task(z_loan(s)); zp_stop_lease_task(z_loan(s)); z_close(z_move(s)); + free(value); return 0; } #else From e8a5181b1c900985ed016306ba4e6c644c7b4a60 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Thu, 14 Dec 2023 16:46:56 +0100 Subject: [PATCH 12/21] fix: set keyexpr --- tests/z_test_fragment_rx.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/z_test_fragment_rx.c b/tests/z_test_fragment_rx.c index f194644d3..b236b1cfc 100644 --- a/tests/z_test_fragment_rx.c +++ b/tests/z_test_fragment_rx.c @@ -34,7 +34,7 @@ void data_handler(const z_sample_t *sample, void *ctx) { } int main(int argc, char **argv) { - const char *keyexpr = "test/**"; + const char *keyexpr = "test/zenoh-pico-fragment"; const char *mode = "client"; char *llocator = NULL; (void)argv; From 44fb7992e431a86fbcaf740f8606578a37b12a7a Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 15 Dec 2023 11:20:48 +0100 Subject: [PATCH 13/21] fix: missing end of fine newline --- .github/workflows/build-check.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index d18f5cb2e..50e408cc5 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -117,4 +117,4 @@ jobs: if: always() run: | docker stop zenoh_router - docker rm zenoh_router \ No newline at end of file + docker rm zenoh_router From 6df4e47c7890f28d025263b0be2bf33ff596c640 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Fri, 15 Dec 2023 11:40:16 +0100 Subject: [PATCH 14/21] fix: use latest docker image --- .github/workflows/build-check.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index 50e408cc5..01ab37196 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -56,7 +56,7 @@ jobs: uses: actions/checkout@v4 - name: Run docker image - run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:master + run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest - name: Build project run: | @@ -103,12 +103,12 @@ jobs: uses: actions/checkout@v4 - name: Run docker image - run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:master + run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest - name: Build project and run test run: | sudo apt install -y ninja-build - # cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja + cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja CMAKE_GENERATOR=Ninja make python3 ./build/tests/fragment.py timeout-minutes: 5 From 5ac84e7eada30a3f5003b3fe7987e8b4f07482cd Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 09:59:30 +0100 Subject: [PATCH 15/21] doc: add info on buffer size value --- include/zenoh-pico/protocol/definitions/message.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/zenoh-pico/protocol/definitions/message.h b/include/zenoh-pico/protocol/definitions/message.h index cf9326103..eea54a70e 100644 --- a/include/zenoh-pico/protocol/definitions/message.h +++ b/include/zenoh-pico/protocol/definitions/message.h @@ -45,7 +45,7 @@ #define _Z_FLAG_Z_T 0x20 // 1 << 5 | QueryTarget if T==1 then the query target is present #define _Z_FLAG_Z_X 0x00 // Unused flags are set to zero -#define _Z_FRAG_BUFF_BASE_SIZE 128 // Base size of the buffer to encode a fragment message header +#define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header // Flags: // - T: Timestamp If T==1 then the timestamp if present From b805d319dd8f1f8a426a2903b1286854a39938c3 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 11:22:54 +0100 Subject: [PATCH 16/21] feat: add source files for performance test --- tests/z_perf_rx.c | 124 ++++++++++++++++++++++++++++++++++++++++++++++ tests/z_perf_tx.c | 113 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 tests/z_perf_rx.c create mode 100644 tests/z_perf_tx.c diff --git a/tests/z_perf_rx.c b/tests/z_perf_rx.c new file mode 100644 index 000000000..b862d5ecf --- /dev/null +++ b/tests/z_perf_rx.c @@ -0,0 +1,124 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" + +typedef struct { + volatile unsigned long count; + size_t curr_len; + z_clock_t start; +} z_stats_t; + +static z_stats_t test_stats; +static volatile bool test_end; + +void z_stats_stop(z_stats_t *stats) { + // Ignore default value + if (stats->curr_len == 0) { + return; + } + // Print values + unsigned long elapsed_ms = z_clock_elapsed_ms(&stats->start); + printf("End test for pkt len: %lu, msg nb: %lu, time ms: %lu\n", stats->curr_len, stats->count, elapsed_ms); + stats->count = 0; +} + +void on_sample(const z_sample_t *sample, void *context) { + z_stats_t *stats = (z_stats_t *)context; + + if (stats->curr_len != sample->payload.len) { + // End previous measurement + z_stats_stop(stats); + // Check for end packet + stats->curr_len = sample->payload.len; + if (sample->payload.len == 1) { + test_end = true; + return; + } + // Start new measurement + printf("Starting test for pkt len: %lu\n", stats->curr_len); + stats->start = z_clock_now(); + } + stats->count++; +} + +int main(int argc, char **argv) { + char *keyexpr = "test/thr"; + const char *mode = "client"; + char *llocator = NULL; + char *clocator = NULL; + + // Get args + int opt; + while ((opt = getopt(argc, argv, "m:l:e:")) != -1) { + switch (opt) { + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'e': + clocator = optarg; + break; + default: + return -1; + } + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare Subscriber/resource + z_owned_closure_sample_t callback = z_closure(on_sample, NULL, (void *)&test_stats); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); + if (!z_check(sub)) { + printf("Unable to create subscriber.\n"); + exit(-1); + } + // Listen until stopped + printf("Start listening.\n"); + while (!test_end) { + } + // Wait for everything to settle + printf("End of test\n"); + z_sleep_s(1); + // Clean up + z_undeclare_subscriber(z_move(sub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + exit(0); +} diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c new file mode 100644 index 000000000..a404b1ba0 --- /dev/null +++ b/tests/z_perf_tx.c @@ -0,0 +1,113 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include +#include + +#include "zenoh-pico.h" + +#define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) +#define TEST_DURATION_US 10000000 + +int send_packets(size_t pkt_len, z_owned_publisher_t *pub, uint8_t *value) { + z_clock_t test_start = z_clock_now(); + unsigned long elapsed_us = 0; + while (elapsed_us < TEST_DURATION_US) { + if (z_publisher_put(z_loan(*pub), (const uint8_t *)value, pkt_len, NULL) != 0) { + printf("Put failed for pkt len: %lu\n", pkt_len); + return -1; + } + elapsed_us = z_clock_elapsed_us(&test_start); + } + return 0; +} + +int main(int argc, char **argv) { + size_t len_array[] = {1048576, 524288, 262144, 131072, 65536, 32768, 16384, 8192, 4096, + 2048, 1024, 512, 256, 128, 64, 32, 16, 8}; // Biggest value first + uint8_t *value = (uint8_t *)malloc(len_array[0]); + memset(value, 1, len_array[0]); + char *keyexpr = "test/thr"; + const char *mode = "client"; + char *llocator = NULL; + char *clocator = NULL; + + // Get args + int opt; + while ((opt = getopt(argc, argv, "m:l:e:")) != -1) { + switch (opt) { + case 'm': + mode = optarg; + break; + case 'l': + llocator = optarg; + break; + case 'e': + clocator = optarg; + break; + default: + return -1; + } + } + // Set config + z_owned_config_t config = z_config_default(); + zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); + if (llocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); + } + if (clocator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); + } + // Open session + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + exit(-1); + } + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks"); + exit(-1); + } + // Declare publisher + z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL); + if (!z_check(pub)) { + printf("Unable to declare publisher for key expression!\n"); + exit(-1); + } + // Wait for joins + if (strcmp(mode, "peer") == 0) { + printf("Waiting for JOIN messages\n"); + z_sleep_s(3); + } + // Send packets + for (size_t i = 0; i < ARRAY_SIZE(len_array); i++) { + printf("Start sending pkt len: %lu\n", len_array[i]); + if (send_packets(len_array[i], &pub, value) != 0) { + break; + } + } + // Send end packet + printf("Sending end pkt\n"); + z_publisher_put(z_loan(pub), (const uint8_t *)value, 1, NULL); + // Clean up + z_undeclare_publisher(z_move(pub)); + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + z_close(z_move(s)); + free(value); + exit(0); +} From cd1aab1e856e3fa7034856ec6a04db97a7f969fa Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 11:42:05 +0100 Subject: [PATCH 17/21] build: add performance test --- CMakeLists.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4728c1f5b..426f85cc6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -310,6 +310,8 @@ if(UNIX OR MSVC) add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c) add_executable(z_test_fragment_tx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_tx.c) add_executable(z_test_fragment_rx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_rx.c) + add_executable(z_perf_tx ${PROJECT_SOURCE_DIR}/tests/z_perf_tx.c) + add_executable(z_perf_rx ${PROJECT_SOURCE_DIR}/tests/z_perf_rx.c) target_link_libraries(z_data_struct_test ${Libname}) target_link_libraries(z_endpoint_test ${Libname}) @@ -320,6 +322,8 @@ if(UNIX OR MSVC) target_link_libraries(z_api_double_drop_test ${Libname}) target_link_libraries(z_test_fragment_tx ${Libname}) target_link_libraries(z_test_fragment_rx ${Libname}) + target_link_libraries(z_perf_tx ${Libname}) + target_link_libraries(z_perf_rx ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY) From 479c1737e1a8ed599c6def2d6c99f978af4fd64b Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 11:47:09 +0100 Subject: [PATCH 18/21] fix: increase buffer size with length field for stream links --- src/transport/unicast/transport.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/transport/unicast/transport.c b/src/transport/unicast/transport.c index 15f660bd8..b41a9ce55 100644 --- a/src/transport/unicast/transport.c +++ b/src/transport/unicast/transport.c @@ -51,14 +51,21 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE; size_t dbuf_size = 0; + size_t wbuf_size = 0; + size_t zbuf_size = 0; _Bool expandable = false; switch (zl->_cap._flow) { case Z_LINK_CAP_FLOW_STREAM: + // Add stream length field to buffer size + wbuf_size = mtu + _Z_MSG_LEN_ENC_SIZE; + zbuf_size = Z_BATCH_UNICAST_SIZE + _Z_MSG_LEN_ENC_SIZE; expandable = true; break; case Z_LINK_CAP_FLOW_DATAGRAM: default: + wbuf_size = mtu; + zbuf_size = Z_BATCH_UNICAST_SIZE; expandable = false; break; } @@ -67,16 +74,17 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo expandable = false; dbuf_size = Z_FRAG_MAX_SIZE; #endif - zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false); - zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE); + // Initialize tx rx buffers + zt->_transport._unicast._wbuf = _z_wbuf_make(wbuf_size, false); + zt->_transport._unicast._zbuf = _z_zbuf_make(zbuf_size); // Initialize the defragmentation buffers zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable); zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable); // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) || + if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) || + (_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size) || #if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0 (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) || (_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) { From f5972b01529cedb7bef210b271251f238f9ce84c Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 12:07:18 +0100 Subject: [PATCH 19/21] fix: make perf tests cross compilable --- tests/z_perf_rx.c | 37 ++++++++++++++----------------------- tests/z_perf_tx.c | 39 +++++++++++++++------------------------ 2 files changed, 29 insertions(+), 47 deletions(-) diff --git a/tests/z_perf_rx.c b/tests/z_perf_rx.c index b862d5ecf..7a67e153a 100644 --- a/tests/z_perf_rx.c +++ b/tests/z_perf_rx.c @@ -15,19 +15,19 @@ #include #include #include -#include #include "zenoh-pico.h" typedef struct { volatile unsigned long count; - size_t curr_len; + unsigned long curr_len; z_clock_t start; } z_stats_t; static z_stats_t test_stats; static volatile bool test_end; +#if Z_FEATURE_SUBSCRIPTION == 1 void z_stats_stop(z_stats_t *stats) { // Ignore default value if (stats->curr_len == 0) { @@ -46,7 +46,7 @@ void on_sample(const z_sample_t *sample, void *context) { // End previous measurement z_stats_stop(stats); // Check for end packet - stats->curr_len = sample->payload.len; + stats->curr_len = (unsigned long)sample->payload.len; if (sample->payload.len == 1) { test_end = true; return; @@ -62,24 +62,12 @@ int main(int argc, char **argv) { char *keyexpr = "test/thr"; const char *mode = "client"; char *llocator = NULL; - char *clocator = NULL; + (void)argv; - // Get args - int opt; - while ((opt = getopt(argc, argv, "m:l:e:")) != -1) { - switch (opt) { - case 'm': - mode = optarg; - break; - case 'l': - llocator = optarg; - break; - case 'e': - clocator = optarg; - break; - default: - return -1; - } + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; } // Set config z_owned_config_t config = z_config_default(); @@ -87,9 +75,6 @@ int main(int argc, char **argv) { if (llocator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); } - if (clocator != NULL) { - zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); - } // Open session z_owned_session_t s = z_open(z_move(config)); if (!z_check(s)) { @@ -122,3 +107,9 @@ int main(int argc, char **argv) { z_close(z_move(s)); exit(0); } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this test requires it.\n"); + return -2; +} +#endif diff --git a/tests/z_perf_tx.c b/tests/z_perf_tx.c index a404b1ba0..454d72de2 100644 --- a/tests/z_perf_tx.c +++ b/tests/z_perf_tx.c @@ -15,14 +15,14 @@ #include #include #include -#include #include "zenoh-pico.h" #define ARRAY_SIZE(array) (sizeof(array) / sizeof(array[0])) #define TEST_DURATION_US 10000000 -int send_packets(size_t pkt_len, z_owned_publisher_t *pub, uint8_t *value) { +#if Z_FEATURE_PUBLICATION == 1 +int send_packets(unsigned long pkt_len, z_owned_publisher_t *pub, uint8_t *value) { z_clock_t test_start = z_clock_now(); unsigned long elapsed_us = 0; while (elapsed_us < TEST_DURATION_US) { @@ -36,31 +36,19 @@ int send_packets(size_t pkt_len, z_owned_publisher_t *pub, uint8_t *value) { } int main(int argc, char **argv) { - size_t len_array[] = {1048576, 524288, 262144, 131072, 65536, 32768, 16384, 8192, 4096, - 2048, 1024, 512, 256, 128, 64, 32, 16, 8}; // Biggest value first + unsigned long len_array[] = {1048576, 524288, 262144, 131072, 65536, 32768, 16384, 8192, 4096, + 2048, 1024, 512, 256, 128, 64, 32, 16, 8}; // Biggest value first uint8_t *value = (uint8_t *)malloc(len_array[0]); memset(value, 1, len_array[0]); char *keyexpr = "test/thr"; const char *mode = "client"; char *llocator = NULL; - char *clocator = NULL; + (void)argv; - // Get args - int opt; - while ((opt = getopt(argc, argv, "m:l:e:")) != -1) { - switch (opt) { - case 'm': - mode = optarg; - break; - case 'l': - llocator = optarg; - break; - case 'e': - clocator = optarg; - break; - default: - return -1; - } + // Check if peer mode + if (argc > 1) { + mode = "peer"; + llocator = "udp/224.0.0.224:7447#iface=lo"; } // Set config z_owned_config_t config = z_config_default(); @@ -68,9 +56,6 @@ int main(int argc, char **argv) { if (llocator != NULL) { zp_config_insert(z_loan(config), Z_CONFIG_LISTEN_KEY, z_string_make(llocator)); } - if (clocator != NULL) { - zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(clocator)); - } // Open session z_owned_session_t s = z_open(z_move(config)); if (!z_check(s)) { @@ -111,3 +96,9 @@ int main(int argc, char **argv) { free(value); exit(0); } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_PUBLICATION but this test requires it.\n"); + return -2; +} +#endif From 97a4f8b238a287b79329b255752c838a110552e3 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 12:21:44 +0100 Subject: [PATCH 20/21] fix: clear windows conversion warnings --- src/protocol/codec/declarations.c | 2 +- src/protocol/codec/network.c | 4 ++-- src/session/rx.c | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/protocol/codec/declarations.c b/src/protocol/codec/declarations.c index 7e474fb6a..e78a94e2a 100644 --- a/src/protocol/codec/declarations.c +++ b/src/protocol/codec/declarations.c @@ -36,7 +36,7 @@ int8_t _z_decl_ext_keyexpr_encode(_z_wbuf_t *wbf, _z_keyexpr_t ke, _Bool has_next_ext) { uint8_t header = _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 0x0f | (has_next_ext ? _Z_FLAG_Z_Z : 0); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); - uint32_t kelen = _z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0; + uint32_t kelen = (uint32_t)(_z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0); header = (_z_keyexpr_is_local(&ke) ? 2 : 0) | (kelen != 0 ? 1 : 0); _Z_RETURN_IF_ERR(_z_zint_encode(wbf, 1 + kelen + _z_zint_len(ke._id))); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header)); diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 28abb51d1..bdbbdfb84 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -326,7 +326,7 @@ int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) { _z_n_msg_response_t *msg = (_z_n_msg_response_t *)ctx; switch (_Z_EXT_FULL_ID(extension->_header)) { case _Z_MSG_EXT_ENC_ZINT | 0x01: { - msg->_ext_qos._val = extension->_body._zint._val; + msg->_ext_qos._val = (uint8_t)extension->_body._zint._val; break; } case _Z_MSG_EXT_ENC_ZBUF | 0x02: { @@ -450,7 +450,7 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) { _z_n_msg_declare_t *decl = (_z_n_msg_declare_t *)ctx; switch (_Z_EXT_FULL_ID(extension->_header)) { case _Z_MSG_EXT_ENC_ZINT | 0x01: { - decl->_ext_qos._val = extension->_body._zint._val; + decl->_ext_qos._val = (uint8_t)extension->_body._zint._val; break; } case _Z_MSG_EXT_ENC_ZBUF | 0x02: { diff --git a/src/session/rx.c b/src/session/rx.c index 28d225827..00e708a14 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -92,7 +92,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_REQUEST_QUERY: { #if Z_FEATURE_QUERYABLE == 1 _z_msg_query_t *query = &req._body._query; - ret = _z_trigger_queryables(zn, query, req._key, req._rid); + ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid); #else _Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported\n"); #endif From 2e277938def1a6bc414a3a01f3f6f4bd2d86d592 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 18 Dec 2023 12:26:48 +0100 Subject: [PATCH 21/21] chore: dummy commit to trigger CI --- zenohpico.pc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenohpico.pc b/zenohpico.pc index e08e8cce2..023120da0 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.11.20231031dev +Version: 0.11.20231218dev Cflags: -I${prefix}/include Libs: -L${prefix}/lib -lzenohpico