Skip to content

Commit

Permalink
Add raweth data length header field (#294)
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-roland authored Dec 15, 2023
1 parent f502cb7 commit a65a8ff
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 50 deletions.
9 changes: 9 additions & 0 deletions include/zenoh-pico/protocol/codec/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
} \
}

#define _Z_CLEAN_RETURN_IF_ERR(base_expr, clean_expr) \
{ \
int8_t __res = base_expr; \
if (__res != _Z_RES_OK) { \
clean_expr; \
return __res; \
} \
}

/*------------------ Internal Zenoh-net Macros ------------------*/
int8_t _z_encoding_prefix_encode(_z_wbuf_t *wbf, z_encoding_prefix_t en);
int8_t _z_encoding_prefix_decode(z_encoding_prefix_t *en, _z_zbuf_t *zbf);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ typedef struct {
} _z_t_msg_fragment_t;
void _z_t_msg_fragment_clear(_z_t_msg_fragment_t *msg);

#define _Z_FRAGMENT_HEADER_SIZE 12

/*------------------ Transport Message ------------------*/
typedef union {
_z_t_msg_join_t _join;
Expand Down
6 changes: 5 additions & 1 deletion include/zenoh-pico/system/link/raweth.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@
#define _ZP_MAC_ADDR_LENGTH 6

// Max frame size
#define _ZP_MAX_ETH_FRAME_SIZE 1500
#define _ZP_MAX_ETH_FRAME_SIZE 1514

// Ethernet header structure type
typedef struct {
uint8_t dmac[_ZP_MAC_ADDR_LENGTH]; // Destination mac address
uint8_t smac[_ZP_MAC_ADDR_LENGTH]; // Source mac address
uint16_t ethtype; // Ethertype of frame
uint16_t data_length; // Payload length
} _zp_eth_header_t;

typedef struct {
Expand All @@ -44,6 +45,7 @@ typedef struct {
uint16_t vlan_type; // Vlan ethtype
uint16_t tag; // Vlan tag
uint16_t ethtype; // Ethertype of frame
uint16_t data_length; // Payload length
} _zp_eth_vlan_header_t;

typedef struct {
Expand All @@ -61,6 +63,8 @@ int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface);
size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len);
size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr);
int8_t _z_close_raweth(_z_sys_net_socket_t *sock);
size_t _z_raweth_ntohs(size_t val);
size_t _z_raweth_htons(size_t val);

#endif

Expand Down
4 changes: 4 additions & 0 deletions src/system/unix/link/raweth.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,9 @@ size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buf
return bytesRead;
}

size_t _z_raweth_ntohs(size_t val) { return ntohs(val); }

size_t _z_raweth_htons(size_t val) { return htons(val); }

#endif // defined(__linux)
#endif // Z_FEATURE_RAWETH_TRANSPORT == 1
2 changes: 1 addition & 1 deletion src/transport/multicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ int8_t _z_multicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_m
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true);
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down
27 changes: 20 additions & 7 deletions src/transport/raweth/rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@

#if Z_FEATURE_RAWETH_TRANSPORT == 1

void print_buf(_z_zbuf_t *buf) {
printf("Buff info: %ld, %ld, %ld\n", buf->_ios._r_pos, buf->_ios._w_pos, buf->_ios._capacity);
}

static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z_bytes_t *addr) {
uint8_t *buff = _z_zbuf_get_wptr(zbf);
size_t rb = _z_receive_raweth(&link->_socket._raweth._sock, buff, _z_zbuf_space_left(zbf), addr);
Expand All @@ -48,14 +44,31 @@ static size_t _z_raweth_link_recv_zbuf(const _z_link_t *link, _z_zbuf_t *zbf, _z
if (has_vlan && (rb < sizeof(_zp_eth_vlan_header_t))) {
return SIZE_MAX;
}
// Update buffer but skip eth header
_z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + rb);
size_t data_length = 0;
if (has_vlan) {
_zp_eth_vlan_header_t *header = (_zp_eth_vlan_header_t *)buff;
// Retrieve data length
data_length = _z_raweth_ntohs(header->data_length);
if (rb < (data_length + sizeof(_zp_eth_vlan_header_t))) {
// Invalid data_length
return SIZE_MAX;
}
// Skip header
_z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_vlan_header_t) + data_length);
_z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_vlan_header_t));
} else {
_zp_eth_header_t *header = (_zp_eth_header_t *)buff;
// Retrieve data length
data_length = _z_raweth_ntohs(header->data_length);
if (rb < (data_length + sizeof(_zp_eth_header_t))) {
// Invalid data_length
return SIZE_MAX;
}
// Skip header
_z_zbuf_set_wpos(zbf, _z_zbuf_get_wpos(zbf) + sizeof(_zp_eth_header_t) + data_length);
_z_zbuf_set_rpos(zbf, _z_zbuf_get_rpos(zbf) + sizeof(_zp_eth_header_t));
}
return rb;
return data_length;
}

/*------------------ Reception helper ------------------*/
Expand Down
99 changes: 59 additions & 40 deletions src/transport/raweth/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,13 @@

#if Z_FEATURE_RAWETH_TRANSPORT == 1

int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) {
#if Z_FEATURE_MULTI_THREAD == 1
static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _z_mutex_unlock(&ztm->_mutex_tx); }
#else
static void _zp_raweth_unlock_tx_mutex(_z_transport_multicast_t *ztm) { _ZP_UNUSED(ztm); }
#endif

static int8_t _zp_raweth_set_socket(const _z_keyexpr_t *keyexpr, _z_raweth_socket_t *sock) {
int8_t ret = _Z_RES_OK;

if (_ZP_RAWETH_CFG_SIZE < 1) {
Expand Down Expand Up @@ -85,33 +91,51 @@ static _z_zint_t __unsafe_z_raweth_get_sn(_z_transport_multicast_t *ztm, z_relia
return sn;
}

static void __unsafe_z_raweth_prepare_header(_z_link_t *zl, _z_wbuf_t *wbf) {
_z_raweth_socket_t *resocket = &zl->_socket._raweth;
// Reserve eth header in buffer
if (resocket->_has_vlan) {
_z_wbuf_set_wpos(wbf, sizeof(_zp_eth_vlan_header_t));
} else {
_z_wbuf_set_wpos(wbf, sizeof(_zp_eth_header_t));
}
}

/**
* This function is unsafe because it operates in potentially concurrent data.
* Make sure that the following mutexes are locked before calling this function:
* - ztm->_mutex_inner
*/
static int8_t __unsafe_z_raweth_write_header(_z_link_t *zl, _z_wbuf_t *wbf) {
_z_raweth_socket_t *resocket = &zl->_socket._raweth;

// Save and reset buffer position
size_t wpos = _z_wbuf_len(wbf);
_z_wbuf_set_wpos(wbf, 0);
// Write eth header in buffer
if (resocket->_has_vlan) {
_zp_eth_vlan_header_t header;
// Set header
memset(&header, 0, sizeof(header));
memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH);
memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH);
header.vlan_type = _ZP_ETH_TYPE_VLAN;
header.tag = resocket->_vlan;
header.ethtype = _ZP_RAWETH_CFG_ETHTYPE;
header.data_length = _z_raweth_htons(wpos - sizeof(header));
// Write header
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header)));
} else {
_zp_eth_header_t header;
// Set header
memcpy(&header.dmac, &resocket->_dmac, _ZP_MAC_ADDR_LENGTH);
memcpy(&header.smac, &resocket->_smac, _ZP_MAC_ADDR_LENGTH);
header.ethtype = _ZP_RAWETH_CFG_ETHTYPE;
header.data_length = _z_raweth_htons(wpos - sizeof(header));
// Write header
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, (uint8_t *)&header, 0, sizeof(header)));
}
// Restore wpos
_z_wbuf_set_wpos(wbf, wpos);
return _Z_RES_OK;
}

Expand Down Expand Up @@ -141,32 +165,18 @@ int8_t _z_raweth_link_send_t_msg(const _z_link_t *zl, const _z_transport_message
uint16_t mtu = (zl->_mtu < Z_BATCH_UNICAST_SIZE) ? zl->_mtu : Z_BATCH_UNICAST_SIZE;
_z_wbuf_t wbf = _z_wbuf_make(mtu, false);

switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Discard const qualifier
_z_link_t *mzl = (_z_link_t *)zl;
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &mzl->_socket._raweth));
// Prepare buff
__unsafe_z_raweth_prepare_header(mzl, &wbf);
// Encode the session message
_Z_RETURN_IF_ERR(_z_transport_message_encode(&wbf, t_msg));
// Write the message header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(mzl, &wbf));
// Encode the session message
ret = _z_transport_message_encode(&wbf, t_msg);
if (ret == _Z_RES_OK) {
switch (zl->_cap._flow) {
case Z_LINK_CAP_FLOW_DATAGRAM:
break;
default:
ret = _Z_ERR_GENERIC;
break;
}
// Send the wbuf on the socket
ret = _z_raweth_link_send_wbuf(zl, &wbf);
}
// Send the wbuf on the socket
ret = _z_raweth_link_send_wbuf(zl, &wbf);
_z_wbuf_clear(&wbf);

return ret;
Expand All @@ -182,13 +192,15 @@ int8_t _z_raweth_send_t_msg(_z_transport_multicast_t *ztm, const _z_transport_me
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth));
// Write the message header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(NULL, &ztm->_link._socket._raweth), _zp_raweth_unlock_tx_mutex(ztm));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Encode the session message
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, t_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Write the message header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;

Expand Down Expand Up @@ -239,26 +251,29 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Set socket info
_Z_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth));
// Write the eth header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_zp_raweth_set_socket(keyexpr, &ztm->_link._socket._raweth),
_zp_raweth_unlock_tx_mutex(ztm));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Set the frame header
_z_zint_t sn = __unsafe_z_raweth_get_sn(ztm, reliability);
_z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability);
// Encode the frame header
_Z_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_wbuf, &t_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Encode the network message
ret = _z_network_message_encode(&ztm->_wbuf, n_msg);
if (ret == _Z_RES_OK) {
if (_z_network_message_encode(&ztm->_wbuf, n_msg) == _Z_RES_OK) {
// Write the eth header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf),
_zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
} else { // The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - 12, true);
_z_wbuf_t fbf = _z_wbuf_make(ztm->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);
// Encode the message on the expandable wbuf
_Z_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg));
_Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _zp_raweth_unlock_tx_mutex(ztm));
// Fragment and send the message
_Bool is_first = true;
while (_z_wbuf_len(&fbf) > 0) {
Expand All @@ -269,12 +284,16 @@ int8_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg,
is_first = false;
// Reset wbuf
_z_wbuf_reset(&ztm->_wbuf);
// Write the eth header
_Z_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf));
// Prepare buff
__unsafe_z_raweth_prepare_header(&ztm->_link, &ztm->_wbuf);
// Serialize one fragment
_Z_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn));
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_serialize_zenoh_fragment(&ztm->_wbuf, &fbf, reliability, sn),
_zp_raweth_unlock_tx_mutex(ztm));
// Write the eth header
_Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_link, &ztm->_wbuf),
_zp_raweth_unlock_tx_mutex(ztm));
// Send the wbuf on the socket
_Z_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf));
_Z_CLEAN_RETURN_IF_ERR(_z_raweth_link_send_wbuf(&ztm->_link, &ztm->_wbuf), _zp_raweth_unlock_tx_mutex(ztm));
// Mark the session that we have transmitted data
ztm->_transmitted = true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/transport/unicast/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ int8_t _z_unicast_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_msg
} else {
// The message does not fit in the current batch, let's fragment it
// Create an expandable wbuf for fragmentation
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - 12, true);
_z_wbuf_t fbf = _z_wbuf_make(ztu->_wbuf._capacity - _Z_FRAGMENT_HEADER_SIZE, true);

ret = _z_network_message_encode(&fbf, n_msg); // Encode the message on the expandable wbuf
if (ret == _Z_RES_OK) {
Expand Down

0 comments on commit a65a8ff

Please sign in to comment.