Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local query timeout #763

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

void _z_pending_query_process_timeout(_z_session_t *zn);

#if Z_FEATURE_QUERY == 1
/*------------------ Query ------------------*/
_z_zint_t _z_get_query_id(_z_session_t *zn);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ typedef struct {
_z_zint_t _id;
_z_reply_handler_t _callback;
_z_drop_handler_t _dropper;
z_clock_t _start_time;
uint64_t _timeout;
void *_arg;
_z_pending_reply_list_t *_pending_replies;
z_query_target_t _target;
Expand Down
9 changes: 7 additions & 2 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,12 @@ z_result_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliab
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

void _zp_session_lock_mutex(_z_session_t *zn);
void _zp_session_unlock_mutex(_z_session_t *zn);
#if Z_FEATURE_MULTI_THREAD == 1
static inline void _z_session_mutex_lock(_z_session_t *zn) { (void)_z_mutex_lock(&zn->_mutex_inner); }
static inline void _z_session_mutex_unlock(_z_session_t *zn) { (void)_z_mutex_unlock(&zn->_mutex_inner); }
#else
static inline void _z_session_mutex_lock(_z_session_t *zn) { _ZP_UNUSED(zn); }
static inline void _z_session_mutex_unlock(_z_session_t *zn) { _ZP_UNUSED(zn); }
#endif

#endif /* INCLUDE_ZENOH_PICO_SESSION_UTILS_H */
15 changes: 4 additions & 11 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1076,14 +1076,7 @@ z_result_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr
z_get_options_t opt;
z_get_options_default(&opt);
if (options != NULL) {
opt.consolidation = options->consolidation;
opt.target = options->target;
opt.encoding = options->encoding;
opt.payload = options->payload;
opt.attachment = options->attachment;
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt = *options;
}

if (opt.consolidation.mode == Z_CONSOLIDATION_MODE_AUTO) {
Expand Down Expand Up @@ -1170,7 +1163,7 @@ z_result_t z_declare_queryable(const z_loaned_session_t *zs, z_owned_queryable_t
z_queryable_options_t opt;
z_queryable_options_default(&opt);
if (options != NULL) {
opt.complete = options->complete;
opt = *options;
}

queryable->_val =
Expand Down Expand Up @@ -1472,7 +1465,7 @@ z_result_t zp_start_read_task(z_loaned_session_t *zs, const zp_task_read_options
zp_task_read_options_t opt;
zp_task_read_options_default(&opt);
if (options != NULL) {
opt.task_attributes = options->task_attributes;
opt = *options;
}
return _zp_start_read_task(_Z_RC_IN_VAL(zs), opt.task_attributes);
#else
Expand Down Expand Up @@ -1504,7 +1497,7 @@ z_result_t zp_start_lease_task(z_loaned_session_t *zs, const zp_task_lease_optio
zp_task_lease_options_t opt;
zp_task_lease_options_default(&opt);
if (options != NULL) {
opt.task_attributes = options->task_attributes;
opt = *options;
}
return _zp_start_lease_task(_Z_RC_IN_VAL(zs), opt.task_attributes);
#else
Expand Down
2 changes: 2 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete
pq->_dropper = dropper;
pq->_pending_replies = NULL;
pq->_arg = arg;
pq->_timeout = timeout_ms;
pq->_start_time = z_clock_now();

ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
Expand Down
44 changes: 22 additions & 22 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ static _z_session_interest_rc_list_t *__unsafe_z_get_interest_by_key_and_flags(_
}

static z_result_t _z_interest_send_decl_resource(_z_session_t *zn, uint32_t interest_id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_resource_list_t *res_list = _z_resource_list_clone(zn->_local_resources);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
_z_resource_list_t *xs = res_list;
while (xs != NULL) {
_z_resource_t *res = _z_resource_list_head(xs);
Expand All @@ -116,9 +116,9 @@ static z_result_t _z_interest_send_decl_resource(_z_session_t *zn, uint32_t inte

#if Z_FEATURE_SUBSCRIPTION == 1
static z_result_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t interest_id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
_z_subscription_rc_list_t *xs = sub_list;
while (xs != NULL) {
_z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs);
Expand All @@ -145,9 +145,9 @@ static z_result_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t in

#if Z_FEATURE_QUERYABLE == 1
static z_result_t _z_interest_send_decl_queryable(_z_session_t *zn, uint32_t interest_id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_queryable_rc_list_t *qle_list = _z_session_queryable_rc_list_clone(zn->_local_queryable);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
_z_session_queryable_rc_list_t *xs = qle_list;
while (xs != NULL) {
_z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs);
Expand Down Expand Up @@ -184,9 +184,9 @@ static z_result_t _z_interest_send_declare_final(_z_session_t *zn, uint32_t inte
}

_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_interest_rc_t *intr = __unsafe_z_get_interest_by_id(zn, id);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return intr;
}

Expand All @@ -195,13 +195,13 @@ _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_inte
(int)_z_string_len(&intr->_key._suffix), _z_string_data(&intr->_key._suffix));
_z_session_interest_rc_t *ret = NULL;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
ret = (_z_session_interest_rc_t *)z_malloc(sizeof(_z_session_interest_rc_t));
if (ret != NULL) {
*ret = _z_session_interest_rc_new_from_val(intr);
zn->_local_interests = _z_session_interest_rc_list_push(zn->_local_interests, ret);
}
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return ret;
}

Expand Down Expand Up @@ -269,17 +269,17 @@ z_result_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
// Retrieve key
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key);
if (!_z_keyexpr_has_suffix(&key)) {
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
// Register declare
_unsafe_z_register_declare(zn, &key, msg.id, decl_type);
// Retrieve interests
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, &key);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
// Parse session_interest list
_z_session_interest_rc_list_t *xs = intrs;
while (xs != NULL) {
Expand Down Expand Up @@ -315,17 +315,17 @@ z_result_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration
default:
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
// Retrieve declare data
_z_keyexpr_t key = _unsafe_z_get_key_from_declare(zn, msg.id, decl_type);
if (!_z_keyexpr_has_suffix(&key)) {
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, &key);
// Remove declare
_unsafe_z_unregister_declare(zn, msg.id, decl_type);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

// Parse session_interest list
_z_session_interest_rc_list_t *xs = intrs;
Expand All @@ -343,25 +343,25 @@ z_result_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration
}

void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
zn->_local_interests =
_z_session_interest_rc_list_drop_filter(zn->_local_interests, _z_session_interest_rc_eq, intr);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}

void _z_flush_interest(_z_session_t *zn) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_interest_rc_list_free(&zn->_local_interests);
_z_declare_data_list_free(&zn->_remote_declares);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}

z_result_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id) {
_z_interest_msg_t msg = {.type = _Z_INTEREST_MSG_TYPE_FINAL, .id = id};
// Retrieve interest
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_interest_rc_t *intr = __unsafe_z_get_interest_by_id(zn, id);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
if (intr == NULL) {
return _Z_RES_OK;
}
Expand Down
52 changes: 38 additions & 14 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ void _z_pending_query_clear(_z_pending_query_t *pen_qry) {

bool _z_pending_query_eq(const _z_pending_query_t *one, const _z_pending_query_t *two) { return one->_id == two->_id; }

static bool _z_pending_query_timeout(const _z_pending_query_t *foo, const _z_pending_query_t *pq) {
_ZP_UNUSED(foo);
bool result = z_clock_elapsed_ms((z_clock_t *)&pq->_start_time) >= pq->_timeout;
if (result) {
_Z_INFO("Dropping query because of timeout");
}
return result;
}

void _z_pending_query_process_timeout(_z_session_t *zn) {
// Lock session
_z_session_mutex_lock(zn);
// Drop all queries with timeout elapsed
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_timeout, NULL);
_z_session_mutex_unlock(zn);
}

/*------------------ Query ------------------*/
_z_zint_t _z_get_query_id(_z_session_t *zn) { return zn->_query_id++; }

Expand Down Expand Up @@ -66,11 +83,11 @@ _z_pending_query_t *__unsafe__z_get_pending_query_by_id(_z_session_t *zn, const
}

_z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, id);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return pql;
}

Expand All @@ -80,7 +97,7 @@ z_result_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_q
_Z_DEBUG(">>> Allocating query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id,
(int)_z_string_len(&pen_qry->_key._suffix), _z_string_data(&pen_qry->_key._suffix));

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, pen_qry->_id);
if (pql == NULL) { // Register query only if a pending one with the same ID does not exist
Expand All @@ -89,7 +106,7 @@ z_result_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_q
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

return ret;
}
Expand All @@ -98,7 +115,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
_z_msg_put_t *msg, z_sample_kind_t kind) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand Down Expand Up @@ -162,7 +179,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
}
}

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

// Trigger the user callback
if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) {
Expand All @@ -183,7 +200,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand All @@ -193,7 +210,7 @@ z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err
// Build the reply
_z_reply_t reply = _z_reply_err_create(msg->_payload, &msg->_encoding);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

// Trigger the user callback
if (ret == _Z_RES_OK) {
Expand All @@ -213,7 +230,7 @@ z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err
z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

// Final reply received for unknown query id
_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
Expand All @@ -239,24 +256,31 @@ z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);
}

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

return ret;
}

void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}

void _z_flush_pending_queries(_z_session_t *zn) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_list_free(&zn->_pending_queries);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}
#else

void _z_pending_query_process_timeout(_z_session_t *zn) {
_ZP_UNUSED(zn);
return;
}

#endif
Loading
Loading