Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add manual batching mechanism #758

Merged
merged 21 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport")
set(Z_FEATURE_TCP_NODELAY 1 CACHE STRING "Toggle TCP_NODELAY")
set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")

add_compile_definitions("Z_BUILD_DEBUG=$<CONFIG:Debug>")
message(STATUS "Building with feature confing:\n\
Expand Down
26 changes: 26 additions & 0 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -2056,6 +2056,32 @@ z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z
const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber);
#endif

#if Z_FEATURE_BATCHING == 1
/**
* Activate the batching mechanism.
* Any message that would have been sent on the network by a subsequent api call (e.g z_put, z_get)
* will be instead stored until the batch is flushed with :c:func:`zp_batch_flush`.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages.
*
* Return:
* ``0`` if batching started, ``negative value`` otherwise.
*/
z_result_t zp_batch_start(const z_loaned_session_t *zs);

/**
* Deactivate the batching mechanism and flush the messages that were stored.
*
* Parameters:
* zs: Pointer to a :c:type:`z_loaned_session_t` that will stop batching messages.
*
* Return:
* ``0`` if batching stopped and batch successfully sent, ``negative value`` otherwise.
*/
z_result_t zp_batch_flush(const z_loaned_session_t *zs);
#endif

/************* Multi Thread Tasks helpers **************/
/**
* Builds a :c:type:`zp_task_read_options_t` with default value.
Expand Down
3 changes: 3 additions & 0 deletions include/zenoh-pico/collections/vec.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ typedef struct {
void **_val;
} _z_vec_t;

static inline _z_vec_t _z_vec_null(void) { return (_z_vec_t){0}; }
_z_vec_t _z_vec_make(size_t capacity);
void _z_vec_copy(_z_vec_t *dst, const _z_vec_t *src, z_element_clone_f f);
void _z_vec_steal(_z_vec_t *dst, _z_vec_t *src);

size_t _z_vec_len(const _z_vec_t *v);
bool _z_vec_is_empty(const _z_vec_t *v);
Expand Down Expand Up @@ -59,6 +61,7 @@ void _z_vec_release(_z_vec_t *v);
static inline void name##_vec_copy(name##_vec_t *dst, const name##_vec_t *src) { \
_z_vec_copy(dst, src, name##_elem_clone); \
} \
static inline void name##_vec_steal(name##_vec_t *dst, name##_vec_t *src) { _z_vec_steal(dst, src); } \
static inline void name##_vec_reset(name##_vec_t *v) { _z_vec_reset(v, name##_elem_free); } \
static inline void name##_vec_clear(name##_vec_t *v) { _z_vec_clear(v, name##_elem_free); } \
static inline void name##_vec_free(name##_vec_t **v) { _z_vec_free(v, name##_elem_free); } \
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#define Z_FEATURE_TCP_NODELAY 1
#define Z_FEATURE_LOCAL_SUBSCRIBER 0
#define Z_FEATURE_PUBLISHER_SESSION_CHECK 1
#define Z_FEATURE_BATCHING 1
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#define Z_FEATURE_TCP_NODELAY @Z_FEATURE_TCP_NODELAY@
#define Z_FEATURE_LOCAL_SUBSCRIBER @Z_FEATURE_LOCAL_SUBSCRIBER@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@
#define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,5 +302,6 @@ _z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool has_interest_id, uint32_t interest_id);
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);
z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_NETWORK_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);
z_result_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
z_result_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

void _zp_session_lock_mutex(_z_session_t *zn);
void _zp_session_unlock_mutex(_z_session_t *zn);
Expand Down
31 changes: 30 additions & 1 deletion include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,33 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa
z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only);
z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason);
void _z_multicast_transport_clear(_z_transport_t *zt);
#endif /* ZENOH_PICO_MULTICAST_TRANSPORT_H */

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) {
if (block) {
_z_mutex_lock(&ztm->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztm->_mutex_tx);
}
}
static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); }
static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_rx); }
static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_rx); }
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_peer); }

#else
static inline z_result_t _z_multicast_tx_mutex_lock(_z_transport_multicast_t *ztm, bool block) {
_ZP_UNUSED(ztm);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_multicast_tx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_rx_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_rx_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
static inline void _z_multicast_peer_mutex_unlock(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }

#endif // (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
#endif /* ZENOH_PICO_MULTICAST_TRANSPORT_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/multicast/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
z_result_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_multicast_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_message_t *t_msg);
z_result_t _z_multicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

#endif /* ZENOH_PICO_MULTICAST_TX_H */
22 changes: 22 additions & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ enum _z_dbuf_state_e {
_Z_DBUF_STATE_OVERFLOW = 2,
};

enum _z_batching_state_e {
_Z_BATCHING_IDLE = 0,
_Z_BATCHING_ACTIVE = 1,
};

typedef struct {
#if Z_FEATURE_FRAGMENTATION == 1
// Defragmentation buffers
Expand Down Expand Up @@ -105,6 +110,12 @@ typedef struct {
_z_zint_t _sn_rx_best_effort;
volatile _z_zint_t _lease;

// Transport batching
#if Z_FEATURE_BATCHING == 1
uint8_t _batch_state;
_z_network_message_vec_t _batch;
#endif

#if Z_FEATURE_MULTI_THREAD == 1
_z_task_t *_read_task;
_z_task_t *_lease_task;
Expand All @@ -129,6 +140,12 @@ typedef struct _z_transport_multicast_t {
_z_mutex_t _mutex_peer;
#endif // Z_FEATURE_MULTI_THREAD == 1

// Transport batching
#if Z_FEATURE_BATCHING == 1
uint8_t _batch_state;
_z_network_message_vec_t _batch;
#endif

_z_link_t _link;

// TX and RX buffers
Expand Down Expand Up @@ -192,4 +209,9 @@ z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason);
void _z_transport_clear(_z_transport_t *zt);
void _z_transport_free(_z_transport_t **zt);

#if Z_FEATURE_BATCHING == 1
bool _z_transport_start_batching(_z_transport_t *zt);
void _z_transport_stop_batching(_z_transport_t *zt);
#endif

#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_TRANSPORT_H */
26 changes: 25 additions & 1 deletion include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,28 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);
#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */

#if Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1
static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) {
if (block) {
_z_mutex_lock(&ztu->_mutex_tx);
return _Z_RES_OK;
} else {
return _z_mutex_try_lock(&ztu->_mutex_tx);
}
}
static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_tx); }
static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _z_mutex_lock(&ztu->_mutex_rx); }
static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _z_mutex_unlock(&ztu->_mutex_rx); }

#else
static inline z_result_t _z_unicast_tx_mutex_lock(_z_transport_unicast_t *ztu, bool block) {
_ZP_UNUSED(ztu);
_ZP_UNUSED(block);
return _Z_RES_OK;
}
static inline void _z_unicast_tx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
static inline void _z_unicast_rx_mutex_lock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
static inline void _z_unicast_rx_mutex_unlock(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
#endif // Z_FEATURE_UNICAST_TRANSPORT == 1 && Z_FEATURE_MULTI_THREAD == 1
#endif /* ZENOH_PICO_UNICAST_TRANSPORT_H */
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/unicast/tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
z_result_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);
z_result_t _z_unicast_send_t_msg(_z_transport_unicast_t *ztu, const _z_transport_message_t *t_msg);
z_result_t _z_unicast_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

#endif /* ZENOH_PICO_TRANSPORT_LINK_TX_H */
19 changes: 19 additions & 0 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,25 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub)
}
#endif

#if Z_FEATURE_BATCHING == 1
z_result_t zp_batch_start(const z_loaned_session_t *zs) {
if (_Z_RC_IS_NULL(zs)) {
return _Z_ERR_SESSION_CLOSED;
}
_z_session_t *session = _Z_RC_IN_VAL(zs);
return _z_transport_start_batching(&session->_tp) ? _Z_RES_OK : _Z_ERR_GENERIC;
}

z_result_t zp_batch_flush(const z_loaned_session_t *zs) {
_z_session_t *session = _Z_RC_IN_VAL(zs);
if (_Z_RC_IS_NULL(zs)) {
return _Z_ERR_SESSION_CLOSED;
}
_z_transport_stop_batching(&session->_tp);
return _z_send_n_batch(session, Z_RELIABILITY_DEFAULT, Z_CONGESTION_CONTROL_DEFAULT);
}
#endif

/**************** Tasks ****************/
void zp_task_read_options_default(zp_task_read_options_t *options) {
#if Z_FEATURE_MULTI_THREAD == 1
Expand Down
5 changes: 5 additions & 0 deletions src/collections/vec.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ void _z_vec_copy(_z_vec_t *dst, const _z_vec_t *src, z_element_clone_f d_f) {
}
}

void _z_vec_steal(_z_vec_t *dst, _z_vec_t *src) {
*dst = *src;
*src = _z_vec_null();
}

void _z_vec_reset(_z_vec_t *v, z_element_free_f free_f) {
for (size_t i = 0; i < v->_len; i++) {
free_f(&v->_val[i]);
Expand Down
1 change: 0 additions & 1 deletion src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ z_result_t _z_declaration_encode(_z_wbuf_t *wbf, const _z_declaration_t *decl) {
case _Z_DECL_FINAL: {
ret = _z_decl_final_encode(wbf);
} break;
;
}
return ret;
}
Expand Down
1 change: 1 addition & 0 deletions src/protocol/codec/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,7 @@ z_result_t _z_frame_decode(_z_t_msg_frame_t *msg, _z_zbuf_t *zbf, uint8_t header
_z_network_message_vec_append(&msg->_messages, nm);
} else {
_z_n_msg_free(&nm);
_z_network_message_vec_clear(&msg->_messages);

_z_zbuf_set_rpos(zbf, r_pos); // Restore the reading position of the iobfer

Expand Down
Loading
Loading