Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fragmentation #297

Merged
merged 21 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions .github/workflows/build-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
uses: actions/checkout@v4

- name: Run docker image
run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:master
run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest

- name: Build project
run: |
Expand Down Expand Up @@ -86,12 +86,35 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4

- name: Build project
- name: Build project and run test
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/raweth.py --reth $Z_FEATURE_RAWETH_TRANSPORT
timeout-minutes: 5
env:
Z_FEATURE_RAWETH_TRANSPORT: ${{ matrix.feature_reth }}


fragment_test:
name: Test multicast and unicast fragmentation
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Run docker image
run: docker run --name zenoh_router --init --net host -d eclipse/zenoh:latest

- name: Build project and run test
run: |
sudo apt install -y ninja-build
cmake -DBATCH_UNICAST_SIZE=4096 -B build/ -G Ninja
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/fragment.py
timeout-minutes: 5

- name: Stop docker image
if: always()
run: |
docker stop zenoh_router
docker rm zenoh_router
16 changes: 16 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ option(WITH_ZEPHYR "Build for Zephyr RTOS" OFF)
option(WITH_FREERTOS_PLUS_TCP "Build for FreeRTOS RTOS and FreeRTOS-Plus-TCP network stack" OFF)
set(ZENOH_DEBUG 0 CACHE STRING "Use this to set the ZENOH_DEBUG variable")
set(FRAG_MAX_SIZE 0 CACHE STRING "Use this to override the maximum size for fragmented messages")
set(BATCH_UNICAST_SIZE 0 CACHE STRING "Use this to override the maximum unicast batch size")
set(CMAKE_EXPORT_COMPILE_COMMANDS ON CACHE INTERNAL "")
if(CMAKE_EXPORT_COMPILE_COMMANDS)
set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES
Expand Down Expand Up @@ -106,6 +107,9 @@ add_definition(ZENOH_DEBUG=${ZENOH_DEBUG})
if(FRAG_MAX_SIZE)
add_definition(Z_FRAG_MAX_SIZE=${FRAG_MAX_SIZE})
endif()
if (BATCH_UNICAST_SIZE)
add_definition(Z_BATCH_UNICAST_SIZE=${BATCH_UNICAST_SIZE})
endif()

# Zenoh pico feature configuration options
set(Z_FEATURE_PUBLICATION 1 CACHE STRING "Toggle publication feature")
Expand All @@ -132,6 +136,9 @@ message(STATUS "Zenoh Level Log: ${ZENOH_DEBUG}")
if(FRAG_MAX_SIZE)
message(STATUS "Fragmented message max size: ${FRAG_MAX_SIZE}")
endif()
if(BATCH_UNICAST_SIZE)
message(STATUS "Unicast batch max size: ${BATCH_UNICAST_SIZE}")
endif()
message(STATUS "Build for Zephyr RTOS: ${WITH_ZEPHYR}")
message(STATUS "Build for FreeRTOS-Plus-TCP: ${WITH_FREERTOS_PLUS_TCP}")
message(STATUS "Configuring for ${CMAKE_SYSTEM_NAME}")
Expand Down Expand Up @@ -301,6 +308,10 @@ if(UNIX OR MSVC)
add_executable(z_keyexpr_test ${PROJECT_SOURCE_DIR}/tests/z_keyexpr_test.c)
add_executable(z_api_null_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_null_drop_test.c)
add_executable(z_api_double_drop_test ${PROJECT_SOURCE_DIR}/tests/z_api_double_drop_test.c)
add_executable(z_test_fragment_tx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_tx.c)
add_executable(z_test_fragment_rx ${PROJECT_SOURCE_DIR}/tests/z_test_fragment_rx.c)
add_executable(z_perf_tx ${PROJECT_SOURCE_DIR}/tests/z_perf_tx.c)
add_executable(z_perf_rx ${PROJECT_SOURCE_DIR}/tests/z_perf_rx.c)

target_link_libraries(z_data_struct_test ${Libname})
target_link_libraries(z_endpoint_test ${Libname})
Expand All @@ -309,9 +320,14 @@ if(UNIX OR MSVC)
target_link_libraries(z_keyexpr_test ${Libname})
target_link_libraries(z_api_null_drop_test ${Libname})
target_link_libraries(z_api_double_drop_test ${Libname})
target_link_libraries(z_test_fragment_tx ${Libname})
target_link_libraries(z_test_fragment_rx ${Libname})
target_link_libraries(z_perf_tx ${Libname})
target_link_libraries(z_perf_rx ${Libname})

configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/fragment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/fragment.py COPYONLY)

enable_testing()
add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test)
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#define _Z_FLAG_Z_T 0x20 // 1 << 5 | QueryTarget if T==1 then the query target is present
#define _Z_FLAG_Z_X 0x00 // Unused flags are set to zero

#define _Z_FRAG_BUFF_BASE_SIZE 128 // Arbitrary base size of the buffer to encode a fragment message header

// Flags:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 128bytes? If there is a reason for it, please document it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an arbitrary value we decided with @Mallets, that seemed not to big and not to small. The buffer being expandable if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fragmentation, the zbuf is used to copy the header bytes and to append the pointer to the payload so as to have a linear reading of the bytes.
128 is a rules of thumb heuristic about the size of memory you need to serialize the header. If more is needed, another block can be allocated straight after and so on an so forth.

// - T: Timestamp If T==1 then the timestamp if present
// - E: Encoding If E==1 then the encoding is present
Expand Down
3 changes: 1 addition & 2 deletions src/protocol/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ int8_t _z_bytes_val_encode(_z_wbuf_t *wbf, const _z_bytes_t *bs) {
int8_t ret = _Z_RES_OK;

if ((wbf->_expansion_step != 0) && (bs->len > Z_TSID_LENGTH)) {
// ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
ret |= _z_wbuf_wrap_bytes(wbf, bs->start, 0, bs->len);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this line is being changed frequently. Last time it was from wrap to write in 5238f25 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yah that's one of the whole point of this PR, reverting this change from @p-avital that outlived its usefulness.

} else {
ret |= _z_wbuf_write_bytes(wbf, bs->start, 0, bs->len);
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
int8_t _z_decl_ext_keyexpr_encode(_z_wbuf_t *wbf, _z_keyexpr_t ke, _Bool has_next_ext) {
uint8_t header = _Z_MSG_EXT_ENC_ZBUF | _Z_MSG_EXT_FLAG_M | 0x0f | (has_next_ext ? _Z_FLAG_Z_Z : 0);
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
uint32_t kelen = _z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0;
uint32_t kelen = (uint32_t)(_z_keyexpr_has_suffix(ke) ? strlen(ke._suffix) : 0);
header = (_z_keyexpr_is_local(&ke) ? 2 : 0) | (kelen != 0 ? 1 : 0);
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, 1 + kelen + _z_zint_len(ke._id)));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/codec/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) {
_z_n_msg_response_t *msg = (_z_n_msg_response_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
case _Z_MSG_EXT_ENC_ZINT | 0x01: {
msg->_ext_qos._val = extension->_body._zint._val;
msg->_ext_qos._val = (uint8_t)extension->_body._zint._val;
break;
}
case _Z_MSG_EXT_ENC_ZBUF | 0x02: {
Expand Down Expand Up @@ -450,7 +450,7 @@ int8_t _z_declare_decode_extensions(_z_msg_ext_t *extension, void *ctx) {
_z_n_msg_declare_t *decl = (_z_n_msg_declare_t *)ctx;
switch (_Z_EXT_FULL_ID(extension->_header)) {
case _Z_MSG_EXT_ENC_ZINT | 0x01: {
decl->_ext_qos._val = extension->_body._zint._val;
decl->_ext_qos._val = (uint8_t)extension->_body._zint._val;
break;
}
case _Z_MSG_EXT_ENC_ZBUF | 0x02: {
Expand Down
2 changes: 1 addition & 1 deletion src/session/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint
case _Z_REQUEST_QUERY: {
#if Z_FEATURE_QUERYABLE == 1
_z_msg_query_t *query = &req._body._query;
ret = _z_trigger_queryables(zn, query, req._key, req._rid);
ret = _z_trigger_queryables(zn, query, req._key, (uint32_t)req._rid);
#else
_Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported\n");
#endif
Expand Down
2 changes: 1 addition & 1 deletion src/transport/multicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);
// Encode the message on the expandable wbuf
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Fragment and send the message
Expand Down
16 changes: 12 additions & 4 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,21 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
if (ret == _Z_RES_OK) {
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
size_t dbuf_size = 0;
size_t wbuf_size = 0;
size_t zbuf_size = 0;
_Bool expandable = false;

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_STREAM:
// Add stream length field to buffer size
wbuf_size = mtu + _Z_MSG_LEN_ENC_SIZE;
zbuf_size = Z_BATCH_UNICAST_SIZE + _Z_MSG_LEN_ENC_SIZE;
expandable = true;
break;
case Z_LINK_CAP_FLOW_DATAGRAM:
default:
wbuf_size = mtu;
zbuf_size = Z_BATCH_UNICAST_SIZE;
expandable = false;
break;
}
Expand All @@ -67,16 +74,17 @@ int8_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl, _z_transpo
expandable = false;
dbuf_size = Z_FRAG_MAX_SIZE;
#endif
zt->_transport._unicast._wbuf = _z_wbuf_make(mtu, false);
zt->_transport._unicast._zbuf = _z_zbuf_make(Z_BATCH_UNICAST_SIZE);
// Initialize tx rx buffers
zt->_transport._unicast._wbuf = _z_wbuf_make(wbuf_size, false);
zt->_transport._unicast._zbuf = _z_zbuf_make(zbuf_size);

// Initialize the defragmentation buffers
zt->_transport._unicast._dbuf_reliable = _z_wbuf_make(dbuf_size, expandable);
zt->_transport._unicast._dbuf_best_effort = _z_wbuf_make(dbuf_size, expandable);

// Clean up the buffers if one of them failed to be allocated
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != mtu) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != Z_BATCH_UNICAST_SIZE) ||
if ((_z_wbuf_capacity(&zt->_transport._unicast._wbuf) != wbuf_size) ||
(_z_zbuf_capacity(&zt->_transport._unicast._zbuf) != zbuf_size) ||
#if Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION == 0
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_reliable) != dbuf_size) ||
(_z_wbuf_capacity(&zt->_transport._unicast._dbuf_best_effort) != dbuf_size)) {
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
_z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
132 changes: 132 additions & 0 deletions tests/fragment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import subprocess
import sys
import time

# Specify the directory for the binaries
DIR_TESTS = "build/tests"

def check_output(tx_status, tx_output, rx_status, rx_output):
test_status = 0

# Expected tx output & status
z_tx_expected_status = 0
z_tx_expected_output = "[tx]: Sending packet on test/zenoh-pico-fragment, len: 10000"
# Expected rx output & status
z_rx_expected_status = 0
z_rx_expected_output = (
"[rx]: Received packet on test/zenoh-pico-fragment, len: 10000, validity: 1")

# Check the exit status of tx
if tx_status == z_tx_expected_status:
print("z_tx status valid")
else:
print(f"z_tx status invalid, expected: {z_tx_expected_status}, received: {tx_status}")
test_status = 1

# Check output of tx
if z_tx_expected_output in tx_output:
print("z_tx output valid")
else:
print("z_tx output invalid:")
print(f"Expected: \"{z_tx_expected_output}\"")
print(f"Received: \"{tx_output}\"")
test_status = 1

# Check the exit status of z_rx
if rx_status == z_rx_expected_status:
print("z_rx status valid")
else:
print(f"z_rx status invalid, expected: {z_rx_expected_status}, received: {rx_status}")
test_status = 1

# Check output of z_rx
if z_rx_expected_output in rx_output:
print("z_rx output valid")
else:
print("z_rx output invalid:")
print(f"Expected: \"{z_rx_expected_output}\"")
print(f"Received: \"{rx_output}\"")
test_status = 1
# Return value
return test_status

def test_client():
# Start rx in the background
print("Start rx client")
z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx"
z_rx_process = subprocess.Popen(z_rx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# Introduce a delay to ensure rx starts
time.sleep(2)
# Start tx
print("Start tx client")
z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx"
z_tx_process = subprocess.Popen(z_tx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
# Wait for tx to finish
z_tx_process.wait()
# Wait for rx to receive
time.sleep(1)
print("Stop rx")
if z_rx_process.poll() is None:
# Send "q" command to rx to stop it
z_rx_process.stdin.write("q\n")
z_rx_process.stdin.flush()
# Wait for rx to finish
z_rx_process.wait()
# Check output
return check_output(z_tx_process.returncode, z_tx_process.stdout.read(),
z_rx_process.returncode, z_rx_process.stdout.read())

def test_peer():
# Start rx in the background
print("Start rx peer")
z_rx_command = f"./{DIR_TESTS}/z_test_fragment_rx 1"
z_rx_process = subprocess.Popen(z_rx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# Introduce a delay to ensure rx starts
time.sleep(2)
# Start tx
print("Start tx peer")
z_tx_command = f"./{DIR_TESTS}/z_test_fragment_tx 1"
z_tx_process = subprocess.Popen(z_tx_command,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
# Wait for tx to finish
z_tx_process.wait()
# Wait for rx to receive
time.sleep(1)
print("Stop rx")
if z_rx_process.poll() is None:
# Send "q" command to rx to stop it
z_rx_process.stdin.write("q\n")
z_rx_process.stdin.flush()
# Wait for rx to finish
z_rx_process.wait()
# Check output
return check_output(z_tx_process.returncode, z_tx_process.stdout.read(),
z_rx_process.returncode, z_rx_process.stdout.read())

if __name__ == "__main__":
EXIT_STATUS = 0

# Run tests
if test_client() == 1:
EXIT_STATUS = 1
if test_peer() == 1:
EXIT_STATUS = 1
# Exit
sys.exit(EXIT_STATUS)
Loading
Loading