diff --git a/.github/workflows/build-check.yaml b/.github/workflows/build-check.yaml index 8ea2af629..b5b96943c 100644 --- a/.github/workflows/build-check.yaml +++ b/.github/workflows/build-check.yaml @@ -75,3 +75,23 @@ jobs: run: | docker stop zenoh_router docker rm zenoh_router + + raweth_build: + name: Build and test raweth transport on ubuntu-latest + runs-on: ubuntu-latest + strategy: + matrix: + feature_reth: [1, 0] + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Build project + run: | + sudo apt install -y ninja-build + CMAKE_GENERATOR=Ninja make + python3 ./build/tests/raweth.py --reth $Z_FEATURE_RAWETH_TRANSPORT + timeout-minutes: 5 + env: + Z_FEATURE_RAWETH_TRANSPORT: ${{ matrix.feature_reth }} + \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 73db9fa37..88312b89c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -107,15 +107,18 @@ set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature") set(Z_FEATURE_SUBSCRIPTION 1 CACHE STRING "Toggle subscription feature") set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature") set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature") +set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature") add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION}) add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION}) add_definition(Z_FEATURE_QUERY=${Z_FEATURE_QUERY}) add_definition(Z_FEATURE_QUERYABLE=${Z_FEATURE_QUERYABLE}) +add_definition(Z_FEATURE_RAWETH_TRANSPORT=${Z_FEATURE_RAWETH_TRANSPORT}) message(STATUS "Building with feature confing:\n\ * PUBLICATION: ${Z_FEATURE_PUBLICATION}\n\ * SUBSCRIPTION: ${Z_FEATURE_SUBSCRIPTION}\n\ * QUERY: ${Z_FEATURE_QUERY}\n\ -* QUERYABLE: ${Z_FEATURE_QUERYABLE}") +* QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\ +* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}") # Print summary of CMAKE configurations message(STATUS "Building in ${CMAKE_BUILD_TYPE} mode") @@ -300,6 +303,7 @@ if(UNIX OR MSVC) target_link_libraries(z_api_double_drop_test ${Libname}) configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY) + configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY) enable_testing() add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test) diff --git a/GNUmakefile b/GNUmakefile index 67932a3bc..f6ad15055 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -51,6 +51,7 @@ Z_FEATURE_PUBLICATION?=1 Z_FEATURE_SUBSCRIPTION?=1 Z_FEATURE_QUERY?=1 Z_FEATURE_QUERYABLE?=1 +Z_FEATURE_RAWETH_TRANSPORT?=0 # zenoh-pico/ directory ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) @@ -67,7 +68,7 @@ CROSSIMG_PREFIX=zenoh-pico_ CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAKE_BUILD_TYPE=$(BUILD_TYPE) -DBUILD_TESTING=$(BUILD_TESTING) -DBUILD_MULTICAST=$(BUILD_MULTICAST)\ -DZ_FEATURE_PUBLICATION=$(Z_FEATURE_PUBLICATION) -DZ_FEATURE_SUBSCRIPTION=$(Z_FEATURE_SUBSCRIPTION) -DZ_FEATURE_QUERY=$(Z_FEATURE_QUERY) -DZ_FEATURE_QUERYABLE=$(Z_FEATURE_QUERYABLE)\ - -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H. + -DZ_FEATURE_RAWETH_TRANSPORT=$(Z_FEATURE_RAWETH_TRANSPORT) -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H. all: make diff --git a/include/zenoh-pico/link/config/raweth.h b/include/zenoh-pico/link/config/raweth.h index 6682805ad..996261307 100644 --- a/include/zenoh-pico/link/config/raweth.h +++ b/include/zenoh-pico/link/config/raweth.h @@ -20,13 +20,9 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/link/link.h" -#if Z_FEATURE_RAWETH_TRANSPORT == 1 - #define RAWETH_SCHEMA "reth" int8_t _z_endpoint_raweth_valid(_z_endpoint_t *endpoint); - int8_t _z_new_link_raweth(_z_link_t *zl, _z_endpoint_t endpoint); -#endif /* Z_FEATURE_RAWETH_TRANSPORT */ #endif /* ZENOH_PICO_LINK_CONFIG_RAWETH_H */ diff --git a/include/zenoh-pico/link/link.h b/include/zenoh-pico/link/link.h index 32c9ebb42..75e2a02a7 100644 --- a/include/zenoh-pico/link/link.h +++ b/include/zenoh-pico/link/link.h @@ -28,6 +28,10 @@ #include "zenoh-pico/system/link/udp.h" #endif +#if Z_FEATURE_RAWETH_TRANSPORT == 1 +#include "zenoh-pico/system/link/raweth.h" +#endif + #if Z_FEATURE_LINK_BLUETOOTH == 1 #include "zenoh-pico/system/link/bt.h" #endif @@ -52,6 +56,7 @@ typedef enum { Z_LINK_CAP_TRANSPORT_UNICAST = 0, Z_LINK_CAP_TRANSPORT_MULTICAST = 1, + Z_LINK_CAP_TRANSPORT_RAWETH = 2, } _z_link_cap_transport_t; /** @@ -73,7 +78,7 @@ typedef enum { * transport: 2 bits, see _z_link_cap_transport_t enum. * flow: 1 bit, see _z_link_cap_flow_t enum. * reliable: 1 bit, 1 if the link is reliable (network definition) - * reserved: 4 bits, reserved for futur use + * reserved: 4 bits, reserved for future use */ typedef struct _z_link_capabilities_t { uint8_t _transport : 2; @@ -111,6 +116,9 @@ typedef struct _z_link_t { #endif #if Z_FEATURE_LINK_WS == 1 _z_ws_socket_t _ws; +#endif +#if Z_FEATURE_RAWETH_TRANSPORT == 1 + _z_raweth_socket_t _raweth; #endif } _socket; diff --git a/include/zenoh-pico/system/platform/unix.h b/include/zenoh-pico/system/platform/unix.h index 8084e22db..577ef670e 100644 --- a/include/zenoh-pico/system/platform/unix.h +++ b/include/zenoh-pico/system/platform/unix.h @@ -35,7 +35,8 @@ typedef struct timeval z_time_t; typedef struct { union { -#if Z_FEATURE_LINK_TCP == 1 || Z_FEATURE_LINK_UDP_MULTICAST == 1 || Z_FEATURE_LINK_UDP_UNICAST == 1 +#if Z_FEATURE_LINK_TCP == 1 || Z_FEATURE_LINK_UDP_MULTICAST == 1 || Z_FEATURE_LINK_UDP_UNICAST == 1 || \ + Z_FEATURE_RAWETH_TRANSPORT == 1 int _fd; #endif }; diff --git a/include/zenoh-pico/transport/common/join.h b/include/zenoh-pico/transport/common/join.h deleted file mode 100644 index 8b8d5fc8b..000000000 --- a/include/zenoh-pico/transport/common/join.h +++ /dev/null @@ -1,22 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_TRANSPORT_JOIN_H -#define ZENOH_PICO_TRANSPORT_JOIN_H - -#include "zenoh-pico/transport/transport.h" - -int8_t _z_send_join(_z_transport_t *zt); - -#endif /* ZENOH_PICO_TRANSPORT_JOIN_H */ diff --git a/include/zenoh-pico/transport/common/lease.h b/include/zenoh-pico/transport/common/lease.h index f35ab2f09..8d85e7c17 100644 --- a/include/zenoh-pico/transport/common/lease.h +++ b/include/zenoh-pico/transport/common/lease.h @@ -17,7 +17,7 @@ #include "zenoh-pico/transport/transport.h" +int8_t _z_send_join(_z_transport_t *zt); int8_t _z_send_keep_alive(_z_transport_t *zt); -void *_zp_lease_task(void *zt_arg); // The argument is void* to avoid incompatible pointer types in tasks #endif /* ZENOH_PICO_TRANSPORT_LEASE_H */ diff --git a/include/zenoh-pico/transport/multicast/join.h b/include/zenoh-pico/transport/multicast/join.h deleted file mode 100644 index b7caaadd3..000000000 --- a/include/zenoh-pico/transport/multicast/join.h +++ /dev/null @@ -1,22 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_MULTICAST_JOIN_H -#define ZENOH_MULTICAST_JOIN_H - -#include "zenoh-pico/transport/transport.h" - -int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm); - -#endif /* ZENOH_MULTICAST_JOIN_H */ diff --git a/include/zenoh-pico/transport/multicast/lease.h b/include/zenoh-pico/transport/multicast/lease.h index aef7701ed..a7896a3f5 100644 --- a/include/zenoh-pico/transport/multicast/lease.h +++ b/include/zenoh-pico/transport/multicast/lease.h @@ -17,9 +17,10 @@ #include "zenoh-pico/transport/transport.h" +int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm); int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm); -int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); -int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt); +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task); +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm); void *_zp_multicast_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks #endif /* ZENOH_PICO_MULTICAST_LEASE_H */ diff --git a/include/zenoh-pico/transport/multicast/rx.h b/include/zenoh-pico/transport/multicast/rx.h index 1ffba60f0..5a74d11cf 100644 --- a/include/zenoh-pico/transport/multicast/rx.h +++ b/include/zenoh-pico/transport/multicast/rx.h @@ -18,7 +18,6 @@ #include "zenoh-pico/transport/transport.h" int8_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); -int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr); diff --git a/include/zenoh-pico/transport/raweth/join.h b/include/zenoh-pico/transport/raweth/join.h deleted file mode 100644 index da8f875d9..000000000 --- a/include/zenoh-pico/transport/raweth/join.h +++ /dev/null @@ -1,22 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_RAWETH_JOIN_H -#define ZENOH_RAWETH_JOIN_H - -#include "zenoh-pico/transport/transport.h" - -int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm); - -#endif /* ZENOH_RAWETH_JOIN_H */ diff --git a/include/zenoh-pico/transport/raweth/lease.h b/include/zenoh-pico/transport/raweth/lease.h deleted file mode 100644 index 924fd38a0..000000000 --- a/include/zenoh-pico/transport/raweth/lease.h +++ /dev/null @@ -1,25 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_RAWETH_LEASE_H -#define ZENOH_PICO_RAWETH_LEASE_H - -#include "zenoh-pico/transport/transport.h" - -int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm); -int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task); -int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt); -void *_zp_raweth_lease_task(void *ztm_arg); // The argument is void* to avoid incompatible pointer types in tasks - -#endif /* ZENOH_PICO_RAWETH_LEASE_H */ diff --git a/include/zenoh-pico/transport/raweth/transport.h b/include/zenoh-pico/transport/raweth/transport.h deleted file mode 100644 index a91eba18a..000000000 --- a/include/zenoh-pico/transport/raweth/transport.h +++ /dev/null @@ -1,28 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#ifndef ZENOH_PICO_RAWETH_TRANSPORT_H -#define ZENOH_PICO_RAWETH_TRANSPORT_H - -#include "zenoh-pico/api/types.h" - -int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param); -int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid); -int8_t _z_raweth_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid); -int8_t _z_raweth_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only); -int8_t _z_raweth_transport_close(_z_transport_multicast_t *ztm, uint8_t reason); -void _z_raweth_transport_clear(_z_transport_t *zt); -#endif /* ZENOH_PICO_RAWETH_TRANSPORT_H */ diff --git a/include/zenoh-pico/transport/transport.h b/include/zenoh-pico/transport/transport.h index 00356c20b..2c2132786 100644 --- a/include/zenoh-pico/transport/transport.h +++ b/include/zenoh-pico/transport/transport.h @@ -53,6 +53,11 @@ _Z_LIST_DEFINE(_z_transport_peer_entry, _z_transport_peer_entry_t) _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport_peer_entry_list_t *root, _z_transport_peer_entry_t *entry); +// Forward declaration to be used in _zp_f_send_tmsg* +typedef struct _z_transport_multicast_t _z_transport_multicast_t; +// Send function prototype +typedef int8_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_transport_message_t *t_msg); + typedef struct { // Session associated to the transport @@ -93,7 +98,7 @@ typedef struct { volatile _Bool _transmitted; } _z_transport_unicast_t; -typedef struct { +typedef struct _z_transport_multicast_t { // Session associated to the transport void *_session; @@ -121,6 +126,9 @@ typedef struct { // Known valid peers _z_transport_peer_entry_list_t *_peers; + // T message send function + _zp_f_send_tmsg _send_f; + #if Z_FEATURE_MULTI_THREAD == 1 _z_task_t *_read_task; _z_task_t *_lease_task; @@ -135,11 +143,13 @@ typedef struct { union { _z_transport_unicast_t _unicast; _z_transport_multicast_t _multicast; + _z_transport_multicast_t _raweth; } _transport; enum { _Z_TRANSPORT_UNICAST_TYPE, _Z_TRANSPORT_MULTICAST_TYPE, + _Z_TRANSPORT_RAWETH_TYPE, } _type; } _z_transport_t; diff --git a/src/api/api.c b/src/api/api.c index 8e228fc45..9455a6a2e 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -547,6 +547,7 @@ int8_t z_info_peers_zid(const z_session_t zs, z_owned_closure_zid_t *callback) { // Call transport function switch (zs._val->_tp._type) { case _Z_TRANSPORT_MULTICAST_TYPE: + case _Z_TRANSPORT_RAWETH_TYPE: _zp_multicast_fetch_zid(&zs._val->_tp, callback); break; default: @@ -638,7 +639,7 @@ z_owned_publisher_t z_declare_publisher(z_session_t zs, z_keyexpr_t keyexpr, con // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. - if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { + if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); if (r == NULL) { uint16_t id = _z_declare_resource(zs._val, keyexpr); @@ -794,7 +795,7 @@ z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_o // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. - if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { + if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); if (r == NULL) { uint16_t id = _z_declare_resource(zs._val, keyexpr); @@ -897,7 +898,7 @@ z_owned_subscriber_t z_declare_subscriber(z_session_t zs, z_keyexpr_t keyexpr, z // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic // resource declarations are only performed on unicast transports. - if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { + if (zs._val->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); if (r == NULL) { char *wild = strpbrk(keyexpr._suffix, "*$"); diff --git a/src/link/link.c b/src/link/link.c index 2888da55d..ff63ec689 100644 --- a/src/link/link.c +++ b/src/link/link.c @@ -17,6 +17,7 @@ #include #include "zenoh-pico/config.h" +#include "zenoh-pico/link/config/raweth.h" #include "zenoh-pico/link/manager.h" #include "zenoh-pico/utils/logging.h" @@ -56,7 +57,6 @@ int8_t _z_open_link(_z_link_t *zl, const char *locator) { { ret = _Z_ERR_CONFIG_LOCATOR_SCHEMA_UNKNOWN; } - if (ret == _Z_RES_OK) { // Open transport link for communication if (zl->_open_f(zl) != _Z_RES_OK) { @@ -92,10 +92,11 @@ int8_t _z_listen_link(_z_link_t *zl, const char *locator) { ret = _z_new_link_bt(zl, ep); } else #endif - { + if (_z_endpoint_raweth_valid(&ep) == _Z_RES_OK) { + ret = _z_new_link_raweth(zl, ep); + } else { ret = _Z_ERR_CONFIG_LOCATOR_SCHEMA_UNKNOWN; } - if (ret == _Z_RES_OK) { // Open transport link for listening if (zl->_listen_f(zl) != _Z_RES_OK) { diff --git a/src/net/session.c b/src/net/session.c index f0e2f3b30..b9c4c947d 100644 --- a/src/net/session.c +++ b/src/net/session.c @@ -24,12 +24,13 @@ #include "zenoh-pico/net/memory.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/common/join.h" #include "zenoh-pico/transport/common/lease.h" #include "zenoh-pico/transport/common/read.h" #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/multicast/lease.h" #include "zenoh-pico/transport/multicast/read.h" +#include "zenoh-pico/transport/raweth/read.h" +#include "zenoh-pico/transport/transport.h" #include "zenoh-pico/transport/unicast.h" #include "zenoh-pico/transport/unicast/lease.h" #include "zenoh-pico/transport/unicast/read.h" @@ -160,6 +161,7 @@ _z_config_t *_z_info(const _z_session_t *zn) { _zp_unicast_info_session(&zn->_tp, ps); break; case _Z_TRANSPORT_MULTICAST_TYPE: + case _Z_TRANSPORT_RAWETH_TYPE: _zp_multicast_info_session(&zn->_tp, ps); break; default: @@ -192,6 +194,9 @@ int8_t _zp_start_read_task(_z_session_t *zn, _z_task_attr_t *attr) { case _Z_TRANSPORT_MULTICAST_TYPE: ret = _zp_multicast_start_read_task(&zn->_tp, attr, task); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_raweth_start_read_task(&zn->_tp, attr, task); + break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; @@ -216,7 +221,10 @@ int8_t _zp_start_lease_task(_z_session_t *zn, _z_task_attr_t *attr) { ret = _zp_unicast_start_lease_task(&zn->_tp, attr, task); break; case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _zp_multicast_start_lease_task(&zn->_tp, attr, task); + ret = _zp_multicast_start_lease_task(&zn->_tp._transport._multicast, attr, task); + break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_multicast_start_lease_task(&zn->_tp._transport._raweth, attr, task); break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; @@ -239,6 +247,9 @@ int8_t _zp_stop_read_task(_z_session_t *zn) { case _Z_TRANSPORT_MULTICAST_TYPE: ret = _zp_multicast_stop_read_task(&zn->_tp); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_raweth_stop_read_task(&zn->_tp); + break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; @@ -254,7 +265,10 @@ int8_t _zp_stop_lease_task(_z_session_t *zn) { ret = _zp_unicast_stop_lease_task(&zn->_tp); break; case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _zp_multicast_stop_lease_task(&zn->_tp); + ret = _zp_multicast_stop_lease_task(&zn->_tp._transport._multicast); + break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_multicast_stop_lease_task(&zn->_tp._transport._raweth); break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; diff --git a/src/session/tx.c b/src/session/tx.c index b00c072f2..8f5b925bb 100644 --- a/src/session/tx.c +++ b/src/session/tx.c @@ -14,6 +14,7 @@ #include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/utils/logging.h" @@ -29,6 +30,9 @@ int8_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reli case _Z_TRANSPORT_MULTICAST_TYPE: ret = _z_multicast_send_n_msg(zn, z_msg, reliability, cong_ctrl); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _z_raweth_send_n_msg(zn, z_msg, reliability, cong_ctrl); + break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; diff --git a/src/session/utils.c b/src/session/utils.c index 1985c8f8a..32b685b78 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -89,6 +89,9 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { case _Z_TRANSPORT_MULTICAST_TYPE: zn->_tp._transport._multicast._session = zn; break; + case _Z_TRANSPORT_RAWETH_TYPE: + zn->_tp._transport._raweth._session = zn; + break; default: break; } diff --git a/src/transport/common/join.c b/src/transport/common/join.c deleted file mode 100644 index 4d3c8498a..000000000 --- a/src/transport/common/join.c +++ /dev/null @@ -1,32 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/common/join.h" - -#include "zenoh-pico/transport/multicast/join.h" - -int8_t _z_send_join(_z_transport_t *zt) { - int8_t ret = _Z_RES_OK; - // Join task only applies to multicast transports - switch (zt->_type) { - case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _zp_multicast_send_join(&zt->_transport._multicast); - break; - default: - (void)zt; - ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; - break; - } - return ret; -} diff --git a/src/transport/common/lease.c b/src/transport/common/lease.c index 8e337f497..3079e9484 100644 --- a/src/transport/common/lease.c +++ b/src/transport/common/lease.c @@ -28,6 +28,9 @@ int8_t _z_send_keep_alive(_z_transport_t *zt) { case _Z_TRANSPORT_MULTICAST_TYPE: ret = _zp_multicast_send_keep_alive(&zt->_transport._multicast); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_multicast_send_keep_alive(&zt->_transport._raweth); + break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; @@ -35,18 +38,19 @@ int8_t _z_send_keep_alive(_z_transport_t *zt) { return ret; } -void *_zp_lease_task(void *zt_arg) { - void *ret = NULL; - _z_transport_t *zt = (_z_transport_t *)zt_arg; +int8_t _z_send_join(_z_transport_t *zt) { + int8_t ret = _Z_RES_OK; + // Join task only applies to multicast transports switch (zt->_type) { - case _Z_TRANSPORT_UNICAST_TYPE: - ret = _zp_unicast_lease_task(&zt->_transport._unicast); - break; case _Z_TRANSPORT_MULTICAST_TYPE: - ret = _zp_multicast_lease_task(&zt->_transport._multicast); + ret = _zp_multicast_send_join(&zt->_transport._multicast); + break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_multicast_send_join(&zt->_transport._raweth); break; default: - ret = NULL; + _ZP_UNUSED(zt); + ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; } return ret; diff --git a/src/transport/common/read.c b/src/transport/common/read.c index a9e2485a4..d7db6c61d 100644 --- a/src/transport/common/read.c +++ b/src/transport/common/read.c @@ -17,6 +17,7 @@ #include #include "zenoh-pico/transport/multicast/read.h" +#include "zenoh-pico/transport/raweth/read.h" #include "zenoh-pico/transport/unicast/read.h" int8_t _z_read(_z_transport_t *zt) { @@ -28,6 +29,9 @@ int8_t _z_read(_z_transport_t *zt) { case _Z_TRANSPORT_MULTICAST_TYPE: ret = _zp_multicast_read(&zt->_transport._multicast); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_raweth_read(&zt->_transport._raweth); + break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; @@ -45,6 +49,9 @@ void *_zp_read_task(void *zt_arg) { case _Z_TRANSPORT_MULTICAST_TYPE: ret = _zp_multicast_read_task(&zt->_transport._multicast); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _zp_raweth_read_task(&zt->_transport._raweth); + break; default: ret = NULL; break; diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 6f38ac365..3508493f4 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -12,12 +12,14 @@ // ZettaScale Zenoh Team, // -#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/common/tx.h" #include "zenoh-pico/api/constants.h" #include "zenoh-pico/protocol/codec/core.h" #include "zenoh-pico/protocol/codec/transport.h" #include "zenoh-pico/protocol/definitions/transport.h" +#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/unicast/tx.h" #include "zenoh-pico/utils/logging.h" @@ -76,6 +78,9 @@ int8_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg) { case _Z_TRANSPORT_MULTICAST_TYPE: ret = _z_multicast_send_t_msg(&zt->_transport._multicast, t_msg); break; + case _Z_TRANSPORT_RAWETH_TYPE: + ret = _z_raweth_send_t_msg(&zt->_transport._raweth, t_msg); + break; default: ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE; break; diff --git a/src/transport/manager.c b/src/transport/manager.c index 7d8f79811..3d430465f 100644 --- a/src/transport/manager.c +++ b/src/transport/manager.c @@ -44,6 +44,7 @@ int8_t _z_new_transport_client(_z_transport_t *zt, char *locator, _z_id_t *local break; } // Multicast transport + case Z_LINK_CAP_TRANSPORT_RAWETH: case Z_LINK_CAP_TRANSPORT_MULTICAST: { _z_transport_multicast_establish_param_t tp_param; ret = _z_multicast_open_client(&tp_param, &zl, local_zid); @@ -82,6 +83,7 @@ int8_t _z_new_transport_peer(_z_transport_t *zt, char *locator, _z_id_t *local_z ret = _z_unicast_transport_create(zt, &zl, &tp_param); break; } + case Z_LINK_CAP_TRANSPORT_RAWETH: case Z_LINK_CAP_TRANSPORT_MULTICAST: { _z_transport_multicast_establish_param_t tp_param; ret = _z_multicast_open_peer(&tp_param, &zl, local_zid); diff --git a/src/transport/multicast/join.c b/src/transport/multicast/join.c deleted file mode 100644 index d577db1da..000000000 --- a/src/transport/multicast/join.c +++ /dev/null @@ -1,38 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/multicast/join.h" - -#include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/multicast/tx.h" - -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - -int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { - _z_conduit_sn_list_t next_sn; - next_sn._is_qos = false; - next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; - next_sn._val._plain._reliable = ztm->_sn_tx_reliable; - - _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; - _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - - return _z_multicast_send_t_msg(ztm, &jsm); -} -#else -int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 diff --git a/src/transport/multicast/lease.c b/src/transport/multicast/lease.c index c30fe4c7a..4a56a9c39 100644 --- a/src/transport/multicast/lease.c +++ b/src/transport/multicast/lease.c @@ -18,13 +18,10 @@ #include "zenoh-pico/config.h" #include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/common/join.h" -#include "zenoh-pico/transport/multicast/join.h" -#include "zenoh-pico/transport/multicast/transport.h" -#include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/common/lease.h" #include "zenoh-pico/utils/logging.h" -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { _z_zint_t ret = local_lease; @@ -60,31 +57,39 @@ static _z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { return ret; } -int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { - int8_t ret = _Z_RES_OK; +int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { + _z_conduit_sn_list_t next_sn; + next_sn._is_qos = false; + next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; + next_sn._val._plain._reliable = ztm->_sn_tx_reliable; - _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); - ret = _z_multicast_send_t_msg(ztm, &t_msg); + _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; + _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - return ret; + return ztm->_send_f(ztm, &jsm); +} + +int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { + _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); + return ztm->_send_f(ztm, &t_msg); } -int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { // Init memory (void)memset(task, 0, sizeof(_z_task_t)); // Attach task - zt->_transport._multicast._lease_task = task; - zt->_transport._multicast._lease_task_running = true; + ztm->_lease_task = task; + ztm->_lease_task_running = true; // Init task - if (_z_task_init(task, attr, _zp_multicast_lease_task, &zt->_transport._multicast) != _Z_RES_OK) { - zt->_transport._multicast._lease_task_running = false; + if (_z_task_init(task, attr, _zp_multicast_lease_task, ztm) != _Z_RES_OK) { + ztm->_lease_task_running = false; return _Z_ERR_SYSTEM_TASK_FAILED; } return _Z_RES_OK; } -int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt) { - zt->_transport._multicast._lease_task_running = false; +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { + ztm->_lease_task_running = false; return _Z_RES_OK; } @@ -184,20 +189,25 @@ void *_zp_multicast_lease_task(void *ztm_arg) { return 0; } #else +int8_t _zp_multicast_send_join(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} + int8_t _zp_multicast_send_keep_alive(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -int8_t _zp_multicast_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - _ZP_UNUSED(zt); +int8_t _zp_multicast_start_lease_task(_z_transport_multicast_t *ztm, _z_task_attr_t *attr, _z_task_t *task) { + _ZP_UNUSED(ztm); _ZP_UNUSED(attr); _ZP_UNUSED(task); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -int8_t _zp_multicast_stop_lease_task(_z_transport_t *zt) { - _ZP_UNUSED(zt); +int8_t _zp_multicast_stop_lease_task(_z_transport_multicast_t *ztm) { + _ZP_UNUSED(ztm); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } @@ -205,4 +215,4 @@ void *_zp_multicast_lease_task(void *ztm_arg) { _ZP_UNUSED(ztm_arg); return NULL; } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/multicast/rx.c b/src/transport/multicast/rx.c index 5d43eb60c..cb8fdeeb7 100644 --- a/src/transport/multicast/rx.c +++ b/src/transport/multicast/rx.c @@ -28,27 +28,8 @@ #include "zenoh-pico/utils/logging.h" #if Z_FEATURE_MULTICAST_TRANSPORT == 1 - -_z_transport_peer_entry_t *_z_find_peer_entry(_z_transport_peer_entry_list_t *l, _z_bytes_t *remote_addr) { - _z_transport_peer_entry_t *ret = NULL; - - _z_transport_peer_entry_list_t *xs = l; - for (; xs != NULL; xs = _z_transport_peer_entry_list_tail(xs)) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(xs); - if (val->_remote_addr.len != remote_addr->len) { - continue; - } - - if (memcmp(val->_remote_addr.start, remote_addr->start, remote_addr->len) == 0) { - ret = val; - } - } - - return ret; -} - -/*------------------ Reception helper ------------------*/ -int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { +static int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, + _z_bytes_t *addr) { _Z_DEBUG(">> recv session msg\n"); int8_t ret = _Z_RES_OK; @@ -110,6 +91,34 @@ int8_t _z_multicast_recv_t_msg_na(_z_transport_multicast_t *ztm, _z_transport_me int8_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { return _z_multicast_recv_t_msg_na(ztm, t_msg, addr); } +#else +int8_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { + _ZP_UNUSED(ztm); + _ZP_UNUSED(t_msg); + _ZP_UNUSED(addr); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 + +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 + +static _z_transport_peer_entry_t *_z_find_peer_entry(_z_transport_peer_entry_list_t *l, _z_bytes_t *remote_addr) { + _z_transport_peer_entry_t *ret = NULL; + + _z_transport_peer_entry_list_t *xs = l; + for (; xs != NULL; xs = _z_transport_peer_entry_list_tail(xs)) { + _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(xs); + if (val->_remote_addr.len != remote_addr->len) { + continue; + } + + if (memcmp(val->_remote_addr.start, remote_addr->start, remote_addr->len) == 0) { + ret = val; + } + } + + return ret; +} int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { @@ -324,13 +333,6 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t return ret; } #else -int8_t _z_multicast_recv_t_msg(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(t_msg); - _ZP_UNUSED(addr); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_transport_message_t *t_msg, _z_bytes_t *addr) { _ZP_UNUSED(ztm); @@ -338,4 +340,4 @@ int8_t _z_multicast_handle_transport_message(_z_transport_multicast_t *ztm, _z_t _ZP_UNUSED(addr); return _Z_ERR_TRANSPORT_NOT_AVAILABLE; } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/multicast/transport.c b/src/transport/multicast/transport.c index 51286de6b..2e5796399 100644 --- a/src/transport/multicast/transport.c +++ b/src/transport/multicast/transport.c @@ -24,31 +24,45 @@ #include "zenoh-pico/transport/multicast.h" #include "zenoh-pico/transport/multicast/rx.h" #include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/utils.h" #include "zenoh-pico/utils/logging.h" -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { int8_t ret = _Z_RES_OK; - - zt->_type = _Z_TRANSPORT_MULTICAST_TYPE; - + // Transport specific information + _z_transport_multicast_t *ztm = NULL; + switch (zl->_cap._transport) { + case Z_LINK_CAP_TRANSPORT_MULTICAST: + zt->_type = _Z_TRANSPORT_MULTICAST_TYPE; + ztm = &zt->_transport._multicast; + ztm->_send_f = _z_multicast_send_t_msg; + break; + case Z_LINK_CAP_TRANSPORT_RAWETH: + zt->_type = _Z_TRANSPORT_RAWETH_TYPE; + ztm = &zt->_transport._raweth; + ztm->_send_f = _z_raweth_send_t_msg; + break; + default: + return _Z_ERR_GENERIC; + } #if Z_FEATURE_MULTI_THREAD == 1 // Initialize the mutexes - ret = _z_mutex_init(&zt->_transport._multicast._mutex_tx); + ret = _z_mutex_init(&ztm->_mutex_tx); if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._multicast._mutex_rx); + ret = _z_mutex_init(&ztm->_mutex_rx); if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._multicast._mutex_peer); + ret = _z_mutex_init(&ztm->_mutex_peer); if (ret != _Z_RES_OK) { - _z_mutex_free(&zt->_transport._multicast._mutex_tx); - _z_mutex_free(&zt->_transport._multicast._mutex_rx); + _z_mutex_free(&ztm->_mutex_tx); + _z_mutex_free(&ztm->_mutex_rx); } } else { - _z_mutex_free(&zt->_transport._multicast._mutex_tx); + _z_mutex_free(&ztm->_mutex_tx); } } #endif // Z_FEATURE_MULTI_THREAD == 1 @@ -56,53 +70,51 @@ int8_t _z_multicast_transport_create(_z_transport_t *zt, _z_link_t *zl, // Initialize the read and write buffers if (ret == _Z_RES_OK) { uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; - zt->_transport._multicast._wbuf = _z_wbuf_make(mtu, false); - zt->_transport._multicast._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); + ztm->_wbuf = _z_wbuf_make(mtu, false); + ztm->_zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._multicast._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._multicast._zbuf) != Z_BATCH_MULTICAST_SIZE)) { + if ((_z_wbuf_capacity(&ztm->_wbuf) != mtu) || (_z_zbuf_capacity(&ztm->_zbuf) != Z_BATCH_MULTICAST_SIZE)) { ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; #if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_free(&zt->_transport._multicast._mutex_tx); - _z_mutex_free(&zt->_transport._multicast._mutex_rx); - _z_mutex_free(&zt->_transport._multicast._mutex_peer); + _z_mutex_free(&ztm->_mutex_tx); + _z_mutex_free(&ztm->_mutex_rx); + _z_mutex_free(&ztm->_mutex_peer); #endif // Z_FEATURE_MULTI_THREAD == 1 - _z_wbuf_clear(&zt->_transport._multicast._wbuf); - _z_zbuf_clear(&zt->_transport._multicast._zbuf); + _z_wbuf_clear(&ztm->_wbuf); + _z_zbuf_clear(&ztm->_zbuf); } } if (ret == _Z_RES_OK) { // Set default SN resolution - zt->_transport._multicast._sn_res = _z_sn_max(param->_seq_num_res); + ztm->_sn_res = _z_sn_max(param->_seq_num_res); // The initial SN at TX side - zt->_transport._multicast._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; - zt->_transport._multicast._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; + ztm->_sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; + ztm->_sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; // Initialize peer list - zt->_transport._multicast._peers = _z_transport_peer_entry_list_new(); + ztm->_peers = _z_transport_peer_entry_list_new(); #if Z_FEATURE_MULTI_THREAD == 1 // Tasks - zt->_transport._multicast._read_task_running = false; - zt->_transport._multicast._read_task = NULL; - zt->_transport._multicast._lease_task_running = false; - zt->_transport._multicast._lease_task = NULL; + ztm->_read_task_running = false; + ztm->_read_task = NULL; + ztm->_lease_task_running = false; + ztm->_lease_task = NULL; #endif // Z_FEATURE_MULTI_THREAD == 1 - zt->_transport._multicast._lease = Z_TRANSPORT_LEASE; + ztm->_lease = Z_TRANSPORT_LEASE; // Notifiers - zt->_transport._multicast._transmitted = false; + ztm->_transmitted = false; // Transport link for multicast - zt->_transport._multicast._link = *zl; + ztm->_link = *zl; } - return ret; } @@ -124,14 +136,22 @@ int8_t _z_multicast_open_peer(_z_transport_multicast_establish_param_t *param, c // Encode and send the message _Z_INFO("Sending Z_JOIN message\n"); - ret = _z_link_send_t_msg(zl, &jsm); + switch (zl->_cap._transport) { + case Z_LINK_CAP_TRANSPORT_MULTICAST: + ret = _z_link_send_t_msg(zl, &jsm); + break; + case Z_LINK_CAP_TRANSPORT_RAWETH: + ret = _z_raweth_link_send_t_msg(zl, &jsm); + break; + default: + return _Z_ERR_GENERIC; + } _z_t_msg_clear(&jsm); if (ret == _Z_RES_OK) { param->_seq_num_res = jsm._body._join._seq_num_res; param->_initial_sn_tx = next_sn; } - return ret; } @@ -149,7 +169,7 @@ int8_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _B int8_t ret = _Z_RES_OK; // Send and clear message _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); - ret = _z_multicast_send_t_msg(ztm, &cm); + ret = ztm->_send_f(ztm, &cm); _z_t_msg_clear(&cm); return ret; } @@ -170,7 +190,6 @@ void _z_multicast_transport_clear(_z_transport_t *zt) { _z_task_join(ztm->_lease_task); _z_task_free(&ztm->_lease_task); } - // Clean up the mutexes _z_mutex_free(&ztm->_mutex_tx); _z_mutex_free(&ztm->_mutex_rx); @@ -227,4 +246,4 @@ int8_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reaso } void _z_multicast_transport_clear(_z_transport_t *zt) { _ZP_UNUSED(zt); } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/join.c b/src/transport/raweth/join.c deleted file mode 100644 index 7ac9e7506..000000000 --- a/src/transport/raweth/join.c +++ /dev/null @@ -1,38 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/raweth/join.h" - -#include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/raweth/tx.h" - -#if Z_FEATURE_RAWETH_TRANSPORT == 1 - -int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm) { - _z_conduit_sn_list_t next_sn; - next_sn._is_qos = false; - next_sn._val._plain._best_effort = ztm->_sn_tx_best_effort; - next_sn._val._plain._reliable = ztm->_sn_tx_reliable; - - _z_id_t zid = ((_z_session_t *)ztm->_session)->_local_zid; - _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - - return _z_raweth_send_t_msg(ztm, &jsm); -} -#else -int8_t _zp_raweth_send_join(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} -#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/lease.c b/src/transport/raweth/lease.c deleted file mode 100644 index 10fa08a8f..000000000 --- a/src/transport/raweth/lease.c +++ /dev/null @@ -1,208 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// - -#include "zenoh-pico/transport/raweth/lease.h" - -#include - -#include "zenoh-pico/config.h" -#include "zenoh-pico/session/utils.h" -#include "zenoh-pico/transport/common/join.h" -#include "zenoh-pico/transport/raweth/join.h" -#include "zenoh-pico/transport/raweth/transport.h" -#include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/utils/logging.h" - -#if Z_FEATURE_RAWETH_TRANSPORT == 1 - -static _z_zint_t _z_get_minimum_lease(_z_transport_peer_entry_list_t *peers, _z_zint_t local_lease) { - _z_zint_t ret = local_lease; - - _z_transport_peer_entry_list_t *it = peers; - while (it != NULL) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(it); - _z_zint_t lease = val->_lease; - if (lease < ret) { - ret = lease; - } - - it = _z_transport_peer_entry_list_tail(it); - } - - return ret; -} - -static _z_zint_t _z_get_next_lease(_z_transport_peer_entry_list_t *peers) { - _z_zint_t ret = SIZE_MAX; - - _z_transport_peer_entry_list_t *it = peers; - while (it != NULL) { - _z_transport_peer_entry_t *val = _z_transport_peer_entry_list_head(it); - _z_zint_t next_lease = val->_next_lease; - if (next_lease < ret) { - ret = next_lease; - } - - it = _z_transport_peer_entry_list_tail(it); - } - - return ret; -} - -int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm) { - int8_t ret = _Z_RES_OK; - - _z_transport_message_t t_msg = _z_t_msg_make_keep_alive(); - ret = _z_raweth_send_t_msg(ztm, &t_msg); - - return ret; -} - -int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - // Init memory - (void)memset(task, 0, sizeof(_z_task_t)); - // Attach task - zt->_transport._raweth._lease_task = task; - zt->_transport._raweth._lease_task_running = true; - // Init task - if (_z_task_init(task, attr, _zp_raweth_lease_task, &zt->_transport._raweth) != _Z_RES_OK) { - zt->_transport._raweth._lease_task_running = false; - return _Z_ERR_SYSTEM_TASK_FAILED; - } - return _Z_RES_OK; -} - -int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt) { - zt->_transport._raweth._lease_task_running = false; - return _Z_RES_OK; -} - -void *_zp_raweth_lease_task(void *ztm_arg) { -#if Z_FEATURE_MULTI_THREAD == 1 - _z_transport_multicast_t *ztm = (_z_transport_multicast_t *)ztm_arg; - ztm->_transmitted = false; - - // From all peers, get the next lease time (minimum) - _z_zint_t next_lease = _z_get_minimum_lease(ztm->_peers, ztm->_lease); - _z_zint_t next_keep_alive = (_z_zint_t)(next_lease / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); - _z_zint_t next_join = Z_JOIN_INTERVAL; - - _z_transport_peer_entry_list_t *it = NULL; - while (ztm->_lease_task_running == true) { - _z_mutex_lock(&ztm->_mutex_peer); - - if (next_lease <= 0) { - it = ztm->_peers; - while (it != NULL) { - _z_transport_peer_entry_t *entry = _z_transport_peer_entry_list_head(it); - if (entry->_received == true) { - // Reset the lease parameters - entry->_received = false; - entry->_next_lease = entry->_lease; - it = _z_transport_peer_entry_list_tail(it); - } else { - _Z_INFO("Remove peer from know list because it has expired after %zums\n", entry->_lease); - ztm->_peers = - _z_transport_peer_entry_list_drop_filter(ztm->_peers, _z_transport_peer_entry_eq, entry); - it = ztm->_peers; - } - } - } - - if (next_join <= 0) { - _zp_raweth_send_join(ztm); - ztm->_transmitted = true; - - // Reset the join parameters - next_join = Z_JOIN_INTERVAL; - } - - if (next_keep_alive <= 0) { - // Check if need to send a keep alive - if (ztm->_transmitted == false) { - if (_zp_raweth_send_keep_alive(ztm) < 0) { - // TODO: Handle retransmission or error - } - } - - // Reset the keep alive parameters - ztm->_transmitted = false; - next_keep_alive = - (_z_zint_t)(_z_get_minimum_lease(ztm->_peers, ztm->_lease) / Z_TRANSPORT_LEASE_EXPIRE_FACTOR); - } - - // Compute the target interval to sleep - _z_zint_t interval; - if (next_lease > 0) { - interval = next_lease; - if (next_keep_alive < interval) { - interval = next_keep_alive; - } - if (next_join < interval) { - interval = next_join; - } - } else { - interval = next_keep_alive; - if (next_join < interval) { - interval = next_join; - } - } - - _z_mutex_unlock(&ztm->_mutex_peer); - - // The keep alive and lease intervals are expressed in milliseconds - z_sleep_ms(interval); - - // Decrement all intervals - _z_mutex_lock(&ztm->_mutex_peer); - - it = ztm->_peers; - while (it != NULL) { - _z_transport_peer_entry_t *entry = _z_transport_peer_entry_list_head(it); - entry->_next_lease = entry->_next_lease - interval; - it = _z_transport_peer_entry_list_tail(it); - } - next_lease = _z_get_next_lease(ztm->_peers); - next_keep_alive = next_keep_alive - interval; - next_join = next_join - interval; - - _z_mutex_unlock(&ztm->_mutex_peer); - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - - return 0; -} -#else -int8_t _zp_raweth_send_keep_alive(_z_transport_multicast_t *ztm) { - _ZP_UNUSED(ztm); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _zp_raweth_start_lease_task(_z_transport_t *zt, _z_task_attr_t *attr, _z_task_t *task) { - _ZP_UNUSED(zt); - _ZP_UNUSED(attr); - _ZP_UNUSED(task); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _zp_raweth_stop_lease_task(_z_transport_t *zt) { - _ZP_UNUSED(zt); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -void *_zp_raweth_lease_task(void *ztm_arg) { - _ZP_UNUSED(ztm_arg); - return NULL; -} -#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/transport.c b/src/transport/raweth/transport.c deleted file mode 100644 index 7dced540d..000000000 --- a/src/transport/raweth/transport.c +++ /dev/null @@ -1,227 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, - -#include "zenoh-pico/transport/raweth/transport.h" - -#include -#include -#include -#include -#include - -#include "zenoh-pico/link/link.h" -#include "zenoh-pico/transport/common/lease.h" -#include "zenoh-pico/transport/common/read.h" -#include "zenoh-pico/transport/common/tx.h" -#include "zenoh-pico/transport/raweth/rx.h" -#include "zenoh-pico/transport/raweth/tx.h" -#include "zenoh-pico/transport/utils.h" -#include "zenoh-pico/utils/logging.h" - -#if Z_FEATURE_RAWETH_TRANSPORT == 1 - -int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { - int8_t ret = _Z_RES_OK; - - zt->_type = _Z_TRANSPORT_RAWETH_TYPE; - -#if Z_FEATURE_MULTI_THREAD == 1 - // Initialize the mutexes - ret = _z_mutex_init(&zt->_transport._raweth._mutex_tx); - if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._raweth._mutex_rx); - if (ret == _Z_RES_OK) { - ret = _z_mutex_init(&zt->_transport._raweth._mutex_peer); - if (ret != _Z_RES_OK) { - _z_mutex_free(&zt->_transport._raweth._mutex_tx); - _z_mutex_free(&zt->_transport._raweth._mutex_rx); - } - } else { - _z_mutex_free(&zt->_transport._raweth._mutex_tx); - } - } -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Initialize the read and write buffers - if (ret == _Z_RES_OK) { - uint16_t mtu = (zl->_mtu < Z_BATCH_MULTICAST_SIZE) ? zl->_mtu : Z_BATCH_MULTICAST_SIZE; - zt->_transport._raweth._wbuf = _z_wbuf_make(mtu, false); - zt->_transport._raweth._zbuf = _z_zbuf_make(Z_BATCH_MULTICAST_SIZE); - - // Clean up the buffers if one of them failed to be allocated - if ((_z_wbuf_capacity(&zt->_transport._raweth._wbuf) != mtu) || - (_z_zbuf_capacity(&zt->_transport._raweth._zbuf) != Z_BATCH_MULTICAST_SIZE)) { - ret = _Z_ERR_SYSTEM_OUT_OF_MEMORY; - -#if Z_FEATURE_MULTI_THREAD == 1 - _z_mutex_free(&zt->_transport._raweth._mutex_tx); - _z_mutex_free(&zt->_transport._raweth._mutex_rx); - _z_mutex_free(&zt->_transport._raweth._mutex_peer); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - _z_wbuf_clear(&zt->_transport._raweth._wbuf); - _z_zbuf_clear(&zt->_transport._raweth._zbuf); - } - } - - if (ret == _Z_RES_OK) { - // Set default SN resolution - zt->_transport._raweth._sn_res = _z_sn_max(param->_seq_num_res); - - // The initial SN at TX side - zt->_transport._raweth._sn_tx_reliable = param->_initial_sn_tx._val._plain._reliable; - zt->_transport._raweth._sn_tx_best_effort = param->_initial_sn_tx._val._plain._best_effort; - - // Initialize peer list - zt->_transport._raweth._peers = _z_transport_peer_entry_list_new(); - -#if Z_FEATURE_MULTI_THREAD == 1 - // Tasks - zt->_transport._raweth._read_task_running = false; - zt->_transport._raweth._read_task = NULL; - zt->_transport._raweth._lease_task_running = false; - zt->_transport._raweth._lease_task = NULL; -#endif // Z_FEATURE_MULTI_THREAD == 1 - - zt->_transport._raweth._lease = Z_TRANSPORT_LEASE; - - // Notifiers - zt->_transport._raweth._transmitted = false; - - // Transport link for raweth - zt->_transport._raweth._link = *zl; - } - - return ret; -} - -int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - int8_t ret = _Z_RES_OK; - - _z_zint_t initial_sn_tx = 0; - z_random_fill(&initial_sn_tx, sizeof(initial_sn_tx)); - initial_sn_tx = initial_sn_tx & !_z_sn_modulo_mask(Z_SN_RESOLUTION); - - _z_conduit_sn_list_t next_sn; - next_sn._is_qos = false; - next_sn._val._plain._best_effort = initial_sn_tx; - next_sn._val._plain._reliable = initial_sn_tx; - - _z_id_t zid = *local_zid; - _z_transport_message_t jsm = _z_t_msg_make_join(Z_WHATAMI_PEER, Z_TRANSPORT_LEASE, zid, next_sn); - - // Encode and send the message - _Z_INFO("Sending Z_JOIN message\n"); - ret = _z_raweth_link_send_t_msg(zl, &jsm); - _z_t_msg_clear(&jsm); - - if (ret == _Z_RES_OK) { - param->_seq_num_res = jsm._body._join._seq_num_res; - param->_initial_sn_tx = next_sn; - } - - return ret; -} - -int8_t _z_raweth_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - _ZP_UNUSED(param); - _ZP_UNUSED(zl); - _ZP_UNUSED(local_zid); - int8_t ret = _Z_ERR_CONFIG_UNSUPPORTED_CLIENT_MULTICAST; - // @TODO: not implemented - return ret; -} - -int8_t _z_raweth_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { - int8_t ret = _Z_RES_OK; - // Send and clear message - _z_transport_message_t cm = _z_t_msg_make_close(reason, link_only); - ret = _z_raweth_send_t_msg(ztm, &cm); - _z_t_msg_clear(&cm); - return ret; -} - -int8_t _z_raweth_transport_close(_z_transport_multicast_t *ztm, uint8_t reason) { - return _z_raweth_send_close(ztm, reason, false); -} - -void _z_raweth_transport_clear(_z_transport_t *zt) { - _z_transport_multicast_t *ztm = &zt->_transport._raweth; -#if Z_FEATURE_MULTI_THREAD == 1 - // Clean up tasks - if (ztm->_read_task != NULL) { - _z_task_join(ztm->_read_task); - _z_task_free(&ztm->_read_task); - } - if (ztm->_lease_task != NULL) { - _z_task_join(ztm->_lease_task); - _z_task_free(&ztm->_lease_task); - } - // Clean up the mutexes - _z_mutex_free(&ztm->_mutex_tx); - _z_mutex_free(&ztm->_mutex_rx); - _z_mutex_free(&ztm->_mutex_peer); -#endif // Z_FEATURE_MULTI_THREAD == 1 - - // Clean up the buffers - _z_wbuf_clear(&ztm->_wbuf); - _z_zbuf_clear(&ztm->_zbuf); - - // Clean up peer list - _z_transport_peer_entry_list_free(&ztm->_peers); - _z_link_clear(&ztm->_link); -} - -#else - -int8_t _z_raweth_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transport_multicast_establish_param_t *param) { - _ZP_UNUSED(zt); - _ZP_UNUSED(zl); - _ZP_UNUSED(param); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _z_raweth_open_peer(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - _ZP_UNUSED(param); - _ZP_UNUSED(zl); - _ZP_UNUSED(local_zid); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _z_raweth_open_client(_z_transport_multicast_establish_param_t *param, const _z_link_t *zl, - const _z_id_t *local_zid) { - _ZP_UNUSED(param); - _ZP_UNUSED(zl); - _ZP_UNUSED(local_zid); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _z_raweth_send_close(_z_transport_multicast_t *ztm, uint8_t reason, _Bool link_only) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(reason); - _ZP_UNUSED(link_only); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; -} - -int8_t _z_raweth_transport_close(_z_transport_multicast_t *ztm, uint8_t reason) { - _ZP_UNUSED(ztm); - _ZP_UNUSED(reason); - return _Z_ERR_TRANSPORT_NOT_AVAILABLE; - ; -} - -void _z_raweth_transport_clear(_z_transport_t *zt) { _ZP_UNUSED(zt); } -#endif // Z_FEATURE_RAWETH_TRANSPORT == 1 diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index bba3153aa..e48f7640b 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -278,6 +278,8 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, // Mark the session that we have transmitted data ztm->_transmitted = true; } + // Clear the expandable buffer + _z_wbuf_clear(&fbf); } #if Z_FEATURE_MULTI_THREAD == 1 _z_mutex_unlock(&ztm->_mutex_tx); @@ -286,6 +288,11 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg, } #else +int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message_t *t_msg) { + _ZP_UNUSED(zl); + _ZP_UNUSED(t_msg); + return _Z_ERR_TRANSPORT_NOT_AVAILABLE; +} int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg) { _ZP_UNUSED(ztm); _ZP_UNUSED(t_msg); diff --git a/src/transport/transport.c b/src/transport/transport.c index 27170233c..a53b7e364 100644 --- a/src/transport/transport.c +++ b/src/transport/transport.c @@ -24,6 +24,8 @@ #include "zenoh-pico/transport/multicast/rx.h" #include "zenoh-pico/transport/multicast/transport.h" #include "zenoh-pico/transport/multicast/tx.h" +#include "zenoh-pico/transport/raweth/rx.h" +#include "zenoh-pico/transport/raweth/tx.h" #include "zenoh-pico/transport/unicast/rx.h" #include "zenoh-pico/transport/unicast/transport.h" #include "zenoh-pico/transport/unicast/tx.h" @@ -38,6 +40,7 @@ int8_t _z_send_close(_z_transport_t *zt, uint8_t reason, _Bool link_only) { ret = _z_unicast_send_close(&zt->_transport._unicast, reason, link_only); break; case _Z_TRANSPORT_MULTICAST_TYPE: + case _Z_TRANSPORT_RAWETH_TYPE: ret = _z_multicast_send_close(&zt->_transport._multicast, reason, link_only); break; default: @@ -55,6 +58,7 @@ void _z_transport_clear(_z_transport_t *zt) { _z_unicast_transport_clear(zt); break; case _Z_TRANSPORT_MULTICAST_TYPE: + case _Z_TRANSPORT_RAWETH_TYPE: _z_multicast_transport_clear(zt); break; default: diff --git a/tests/raweth.py b/tests/raweth.py new file mode 100644 index 000000000..8f14e3b78 --- /dev/null +++ b/tests/raweth.py @@ -0,0 +1,142 @@ +import argparse +import subprocess +import sys +import time + +# Specify the directory for the binaries +DIR_EXAMPLES = "build/examples" + +def pub_and_sub(args): + print("*** Pub & sub test ***") + test_status = 0 + + # Expected z_pub output & status + if args.reth == 1: + z_pub_expected_status = 0 + z_pub_expected_output = '''Opening session... +Declaring publisher for 'demo/example/zenoh-pico-pub'... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')... +Putting Data ('demo/example/zenoh-pico-pub': 'Pub from Pico!')...''' + else : + z_pub_expected_status = 255 + z_pub_expected_output = '''Opening session... +Unable to open session!''' + + # Expected z_sub output & status + if args.reth == 1: + z_sub_expected_status = 0 + z_sub_expected_output = '''Opening session... +Declaring Subscriber on 'demo/example/**'... +Enter 'q' to quit... +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!') +>> [Subscriber] Received ('demo/example/zenoh-pico-pub': 'Pub from Pico!')''' + else : + z_sub_expected_status = 255 + z_sub_expected_output = '''Opening session... +Unable to open session!''' + + print("Start subscriber") + # Start z_sub in the background + z_sub_command = f"sudo ./{DIR_EXAMPLES}/z_sub -m \"peer\" -l \"reth/0\"s" + z_sub_process = subprocess.Popen(z_sub_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, text=True) + + # Introduce a delay to ensure z_sub starts + time.sleep(2) + + print("Start publisher") + # Start z_pub + z_pub_command = f"sudo ./{DIR_EXAMPLES}/z_pub -m \"peer\" -l \"reth/0\"s" + z_pub_process = subprocess.Popen(z_pub_command, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + + # Wait for z_pub to finish + z_pub_process.wait() + + print("Stop subscriber") + if z_sub_process.poll() is None: + # Send "q" command to z_sub to stop it + z_sub_process.stdin.write("q\n") + z_sub_process.stdin.flush() + + # Wait for z_sub to finish + z_sub_process.wait() + + print("Check publisher status & output") + # Check the exit status of z_pub + z_pub_status = z_pub_process.returncode + if z_pub_status == z_pub_expected_status: + print("z_pub status valid") + else: + print(f"z_pub status invalid, expected: {z_pub_expected_status}, received: {z_pub_status}") + test_status = 1 + + # Check output of z_pub + z_pub_output = z_pub_process.stdout.read() + if z_pub_expected_output in z_pub_output: + print("z_pub output valid") + else: + print("z_pub output invalid:") + print(f"Expected: \"{z_pub_expected_output}\"") + print(f"Received: \"{z_pub_output}\"") + test_status = 1 + + print("Check subscriber status & output") + # Check the exit status of z_sub + z_sub_status = z_sub_process.returncode + if z_sub_status == z_sub_expected_status: + print("z_sub status valid") + else: + print(f"z_sub status invalid, expected: {z_sub_expected_status}, received: {z_sub_status}") + test_status = 1 + + # Check output of z_sub + z_sub_output = z_sub_process.stdout.read() + if z_sub_expected_output in z_sub_output: + print("z_sub output valid") + else: + print("z_sub output invalid:") + print(f"Expected: \"{z_sub_expected_output}\"") + print(f"Received: \"{z_sub_output}\"") + test_status = 1 + # Return value + return test_status + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="This script runs zenoh-pico examples" + " and checks them according to the given configuration") + parser.add_argument("--reth", type=int, choices=[0, 1], + help="Z_FEATURE_RAWETH_TRANSPORT (0 or 1)") + + EXIT_STATUS = 0 + prog_args = parser.parse_args() + print(f"Args value, reth:{prog_args.reth}") + + # Test pub and sub examples + if pub_and_sub(prog_args) == 1: + EXIT_STATUS = 1 + # Exit + sys.exit(EXIT_STATUS)