Skip to content

Commit

Permalink
Improve query/reply perf (#781)
Browse files Browse the repository at this point in the history
* fix: remove superfluous rc init

* feat: lazify svec release

* refactor: rename slice_empty as slice_null

* feat: rework svec expand

* refactor: z_bytes_append_slice

* feat: lazify arc slice

* feat: improve stirng/slice move

* feat: lazify sample timestamp set

* feat: add non-reader decode functions

* feat: remove superfluous keyexpr clear

* feat: add bytes alias arc

* feat: add rx pool size config option

* feat: use transport arc slice pool for payload decode

* feat: n msg svec is now a transport resource pool

* fix: add offset to svec init

* feat: make svec use elem f a per call arg

* feat: skip ke suffix check

* fix: arc pool

* feat: improve sub perf and readability

* feat: add z_string_alias_slice function

* feat: align queryable with subscription

* feat: align reply with sub & queryables

* feat: nothing to clear in frame

* fix: reply clean up

* feat: streamline replies

* feat: query is not a rc and store a session rc

* fix: ci issues

* fix: attachment examples

* fix: keyexpr equals

* fix: flaky test
  • Loading branch information
jean-roland authored Nov 13, 2024
1 parent de430cb commit 0f1ce95
Show file tree
Hide file tree
Showing 62 changed files with 711 additions and 558 deletions.
7 changes: 3 additions & 4 deletions examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,11 @@ void reply_handler(z_loaned_reply_t *reply, void *ctx) {

// Check attachment
const z_loaned_bytes_t *attachment = z_sample_attachment(sample);
if (attachment == NULL) {
return;
}
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) < 0) {
return;
}
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
7 changes: 3 additions & 4 deletions examples/unix/c11/z_queryable_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ void query_handler(z_loaned_query_t *query, void *ctx) {

// Check attachment
const z_loaned_bytes_t *attachment = z_query_attachment(query);
if (attachment != NULL) {
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) == Z_OK) {
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
8 changes: 3 additions & 5 deletions examples/unix/c11/z_sub_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,10 @@ void data_handler(z_loaned_sample_t *sample, void *ctx) {
printf(" with timestamp: %" PRIu64 "\n", z_timestamp_ntp64_time(ts));
}
// Check attachment

const z_loaned_bytes_t *attachment = z_sample_attachment(sample);
if (attachment != NULL) {
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len);
ze_deserializer_t deserializer = ze_deserializer_from_bytes(attachment);
size_t attachment_len;
if (ze_deserializer_deserialize_sequence_length(&deserializer, &attachment_len) == Z_OK) {
kv_pair_t *kvp = (kv_pair_t *)malloc(sizeof(kv_pair_t) * attachment_len);
for (size_t i = 0; i < attachment_len; ++i) {
ze_deserializer_deserialize_string(&deserializer, &kvp[i].key);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ _Z_OWNED_TYPE_VALUE(_z_queryable_t, queryable)
/**
* Represents a Zenoh Query entity, received by Zenoh Queryable entities.
*/
_Z_OWNED_TYPE_RC(_z_query_rc_t, query)
_Z_OWNED_TYPE_VALUE(_z_query_t, query)

/**
* Represents the encoding of a payload, in a MIME-like format.
Expand Down
6 changes: 5 additions & 1 deletion include/zenoh-pico/collections/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ inline size_t _z_arc_slice_size(const _z_arc_slice_t *s) {
return sizeof(_z_arc_slice_t);
}
_Z_ELEM_DEFINE(_z_arc_slice, _z_arc_slice_t, _z_arc_slice_size, _z_arc_slice_drop, _z_arc_slice_copy, _z_arc_slice_move)
_Z_SVEC_DEFINE(_z_arc_slice, _z_arc_slice_t, true)
_Z_SVEC_DEFINE(_z_arc_slice, _z_arc_slice_t)

/*-------- Bytes --------*/
/**
Expand All @@ -44,13 +44,17 @@ typedef struct {

// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
static inline _z_bytes_t _z_bytes_null(void) { return (_z_bytes_t){0}; }
static inline void _z_bytes_alias_arc_slice(_z_bytes_t *dst, _z_arc_slice_t *s) {
dst->_slices = _z_arc_slice_svec_alias_element(s);
}
bool _z_bytes_check(const _z_bytes_t *bytes);
z_result_t _z_bytes_append_bytes(_z_bytes_t *dst, _z_bytes_t *src);
z_result_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s);
z_result_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src);
_z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src);
void _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src);
void _z_bytes_drop(_z_bytes_t *bytes);
void _z_bytes_aliased_drop(_z_bytes_t *bytes);
void _z_bytes_free(_z_bytes_t **bs);
size_t _z_bytes_num_slices(const _z_bytes_t *bs);
_z_arc_slice_t *_z_bytes_get_slice(const _z_bytes_t *bs, size_t i);
Expand Down
27 changes: 10 additions & 17 deletions include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ size_t _z_simple_rc_strong_count(void *cnt);
return p; \
} \
static inline name##_rc_t name##_rc_clone(const name##_rc_t *p) { \
name##_rc_t c = name##_rc_null(); \
if (_z_rc_increase_strong(p->_cnt) == _Z_RES_OK) { \
c = *p; \
return *p; \
} \
return c; \
return name##_rc_null(); \
} \
static inline name##_rc_t *name##_rc_clone_as_ptr(const name##_rc_t *p) { \
name##_rc_t *c = (name##_rc_t *)z_malloc(sizeof(name##_rc_t)); \
Expand All @@ -93,12 +92,10 @@ size_t _z_simple_rc_strong_count(void *cnt);
return c; \
} \
static inline name##_weak_t name##_rc_clone_as_weak(const name##_rc_t *p) { \
name##_weak_t c = name##_weak_null(); \
if (_z_rc_increase_weak(p->_cnt) == _Z_RES_OK) { \
c._val = p->_val; \
c._cnt = p->_cnt; \
return (name##_weak_t){._val = p->_val, ._cnt = p->_cnt}; \
} \
return c; \
return name##_weak_null(); \
} \
static inline name##_weak_t *name##_rc_clone_as_weak_ptr(const name##_rc_t *p) { \
name##_weak_t *c = (name##_weak_t *)z_malloc(sizeof(name##_weak_t)); \
Expand Down Expand Up @@ -137,20 +134,17 @@ size_t _z_simple_rc_strong_count(void *cnt);
return res; \
} \
static inline name##_weak_t name##_weak_clone(const name##_weak_t *p) { \
name##_weak_t c = name##_weak_null(); \
if (_z_rc_increase_weak(p->_cnt) == _Z_RES_OK) { \
c = *p; \
return *p; \
} \
return c; \
return name##_weak_null(); \
} \
static inline void name##_weak_copy(name##_weak_t *dst, const name##_weak_t *p) { *dst = name##_weak_clone(p); } \
static inline name##_rc_t name##_weak_upgrade(const name##_weak_t *p) { \
name##_rc_t c = name##_rc_null(); \
if (_z_rc_weak_upgrade(p->_cnt) == _Z_RES_OK) { \
c._val = p->_val; \
c._cnt = p->_cnt; \
return (name##_rc_t){._val = p->_val, ._cnt = p->_cnt}; \
} \
return c; \
return name##_rc_null(); \
} \
static inline bool name##_weak_eq(const name##_weak_t *left, const name##_weak_t *right) { \
return (left->_val == right->_val); \
Expand Down Expand Up @@ -200,11 +194,10 @@ size_t _z_simple_rc_strong_count(void *cnt);
return p; \
} \
static inline name##_simple_rc_t name##_simple_rc_clone(const name##_simple_rc_t *p) { \
name##_simple_rc_t c = name##_simple_rc_null(); \
if (_z_simple_rc_increase(p->_cnt) == _Z_RES_OK) { \
c = *p; \
return *p; \
} \
return c; \
return name##_simple_rc_null(); \
} \
static inline name##_simple_rc_t *name##_simple_rc_clone_as_ptr(const name##_simple_rc_t *p) { \
name##_simple_rc_t *c = (name##_simple_rc_t *)z_malloc(sizeof(name##_simple_rc_t)); \
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/collections/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ typedef struct {
_z_delete_context_t _delete_context;
} _z_slice_t;

static inline _z_slice_t _z_slice_empty(void) { return (_z_slice_t){0}; }
static inline void _z_slice_reset(_z_slice_t *bs) { *bs = _z_slice_empty(); }
static inline _z_slice_t _z_slice_null(void) { return (_z_slice_t){0}; }
static inline void _z_slice_reset(_z_slice_t *bs) { *bs = _z_slice_null(); }
static inline bool _z_slice_is_empty(const _z_slice_t *bs) { return bs->len == 0; }
static inline bool _z_slice_check(const _z_slice_t *slice) { return slice->start != NULL; }
static inline _z_slice_t _z_slice_alias(const _z_slice_t bs) {
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/collections/string.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static inline _z_string_t _z_string_alias(const _z_string_t str) {
_z_string_t _z_string_copy_from_str(const char *value);
_z_string_t _z_string_copy_from_substr(const char *value, size_t len);
_z_string_t *_z_string_copy_from_str_as_ptr(const char *value);
_z_string_t _z_string_alias_slice(const _z_slice_t *slice);
_z_string_t _z_string_alias_str(const char *value);
_z_string_t _z_string_alias_substr(const char *value, size_t len);
_z_string_t _z_string_from_str_custom_deleter(char *value, _z_delete_context_t c);
Expand All @@ -98,7 +99,7 @@ _z_string_t _z_string_convert_bytes(const _z_slice_t *bs);
_z_string_t _z_string_preallocate(const size_t len);

_Z_ELEM_DEFINE(_z_string, _z_string_t, _z_string_len, _z_string_clear, _z_string_copy, _z_string_move)
_Z_SVEC_DEFINE(_z_string, _z_string_t, true)
_Z_SVEC_DEFINE(_z_string, _z_string_t)
_Z_LIST_DEFINE(_z_string, _z_string_t)
_Z_INT_MAP_DEFINE(_z_string, _z_string_t)

Expand Down
Loading

0 comments on commit 0f1ce95

Please sign in to comment.