Skip to content

Commit

Permalink
Add local query timeout (eclipse-zenoh#763)
Browse files Browse the repository at this point in the history
* refactor: rename session mutex function

* fix: api options recopy

* feat: add local query timeout info

* feat: add local lazy query timeout check

* feat: centralize query timeout process
  • Loading branch information
jean-roland committed Nov 13, 2024
1 parent 614aedb commit 98e02e4
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 101 deletions.
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
extern "C" {
#endif

void _z_pending_query_process_timeout(_z_session_t *zn);

#if Z_FEATURE_QUERY == 1
/*------------------ Query ------------------*/
_z_zint_t _z_get_query_id(_z_session_t *zn);
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ typedef struct {
_z_zint_t _id;
_z_closure_reply_callback_t _callback;
_z_drop_handler_t _dropper;
z_clock_t _start_time;
uint64_t _timeout;
void *_arg;
_z_pending_reply_list_t *_pending_replies;
z_query_target_t _target;
Expand Down
9 changes: 7 additions & 2 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ z_result_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliab
z_congestion_control_t cong_ctrl);
z_result_t _z_send_n_batch(_z_session_t *zn, z_reliability_t reliability, z_congestion_control_t cong_ctrl);

void _zp_session_lock_mutex(_z_session_t *zn);
void _zp_session_unlock_mutex(_z_session_t *zn);
#if Z_FEATURE_MULTI_THREAD == 1
static inline void _z_session_mutex_lock(_z_session_t *zn) { (void)_z_mutex_lock(&zn->_mutex_inner); }
static inline void _z_session_mutex_unlock(_z_session_t *zn) { (void)_z_mutex_unlock(&zn->_mutex_inner); }
#else
static inline void _z_session_mutex_lock(_z_session_t *zn) { _ZP_UNUSED(zn); }
static inline void _z_session_mutex_unlock(_z_session_t *zn) { _ZP_UNUSED(zn); }
#endif

#ifdef __cplusplus
}
Expand Down
15 changes: 4 additions & 11 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -1075,14 +1075,7 @@ z_result_t z_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr
z_get_options_t opt;
z_get_options_default(&opt);
if (options != NULL) {
opt.consolidation = options->consolidation;
opt.target = options->target;
opt.encoding = options->encoding;
opt.payload = options->payload;
opt.attachment = options->attachment;
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
opt.is_express = options->is_express;
opt = *options;
}

if (opt.consolidation.mode == Z_CONSOLIDATION_MODE_AUTO) {
Expand Down Expand Up @@ -1172,7 +1165,7 @@ z_result_t z_declare_queryable(const z_loaned_session_t *zs, z_owned_queryable_t
z_queryable_options_t opt;
z_queryable_options_default(&opt);
if (options != NULL) {
opt.complete = options->complete;
opt = *options;
}

queryable->_val =
Expand Down Expand Up @@ -1474,7 +1467,7 @@ z_result_t zp_start_read_task(z_loaned_session_t *zs, const zp_task_read_options
zp_task_read_options_t opt;
zp_task_read_options_default(&opt);
if (options != NULL) {
opt.task_attributes = options->task_attributes;
opt = *options;
}
return _zp_start_read_task(_Z_RC_IN_VAL(zs), opt.task_attributes);
#else
Expand Down Expand Up @@ -1506,7 +1499,7 @@ z_result_t zp_start_lease_task(z_loaned_session_t *zs, const zp_task_lease_optio
zp_task_lease_options_t opt;
zp_task_lease_options_default(&opt);
if (options != NULL) {
opt.task_attributes = options->task_attributes;
opt = *options;
}
return _zp_start_lease_task(_Z_RC_IN_VAL(zs), opt.task_attributes);
#else
Expand Down
2 changes: 2 additions & 0 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete
pq->_dropper = dropper;
pq->_pending_replies = NULL;
pq->_arg = arg;
pq->_timeout = timeout_ms;
pq->_start_time = z_clock_now();

ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session
if (ret == _Z_RES_OK) {
Expand Down
44 changes: 22 additions & 22 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ static _z_session_interest_rc_list_t *__unsafe_z_get_interest_by_key_and_flags(_
}

static z_result_t _z_interest_send_decl_resource(_z_session_t *zn, uint32_t interest_id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_resource_list_t *res_list = _z_resource_list_clone(zn->_local_resources);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
_z_resource_list_t *xs = res_list;
while (xs != NULL) {
_z_resource_t *res = _z_resource_list_head(xs);
Expand All @@ -116,9 +116,9 @@ static z_result_t _z_interest_send_decl_resource(_z_session_t *zn, uint32_t inte

#if Z_FEATURE_SUBSCRIPTION == 1
static z_result_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t interest_id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
_z_subscription_rc_list_t *xs = sub_list;
while (xs != NULL) {
_z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs);
Expand All @@ -145,9 +145,9 @@ static z_result_t _z_interest_send_decl_subscriber(_z_session_t *zn, uint32_t in

#if Z_FEATURE_QUERYABLE == 1
static z_result_t _z_interest_send_decl_queryable(_z_session_t *zn, uint32_t interest_id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_queryable_rc_list_t *qle_list = _z_session_queryable_rc_list_clone(zn->_local_queryable);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
_z_session_queryable_rc_list_t *xs = qle_list;
while (xs != NULL) {
_z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs);
Expand Down Expand Up @@ -184,9 +184,9 @@ static z_result_t _z_interest_send_declare_final(_z_session_t *zn, uint32_t inte
}

_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_interest_rc_t *intr = __unsafe_z_get_interest_by_id(zn, id);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return intr;
}

Expand All @@ -195,13 +195,13 @@ _z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_inte
(int)_z_string_len(&intr->_key._suffix), _z_string_data(&intr->_key._suffix));
_z_session_interest_rc_t *ret = NULL;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
ret = (_z_session_interest_rc_t *)z_malloc(sizeof(_z_session_interest_rc_t));
if (ret != NULL) {
*ret = _z_session_interest_rc_new_from_val(intr);
zn->_local_interests = _z_session_interest_rc_list_push(zn->_local_interests, ret);
}
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return ret;
}

Expand Down Expand Up @@ -269,17 +269,17 @@ z_result_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
// Retrieve key
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key);
if (!_z_keyexpr_has_suffix(&key)) {
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
// Register declare
_unsafe_z_register_declare(zn, &key, msg.id, decl_type);
// Retrieve interests
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, &key);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
// Parse session_interest list
_z_session_interest_rc_list_t *xs = intrs;
while (xs != NULL) {
Expand Down Expand Up @@ -315,17 +315,17 @@ z_result_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration
default:
return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN;
}
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
// Retrieve declare data
_z_keyexpr_t key = _unsafe_z_get_key_from_declare(zn, msg.id, decl_type);
if (!_z_keyexpr_has_suffix(&key)) {
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return _Z_ERR_KEYEXPR_UNKNOWN;
}
_z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, &key);
// Remove declare
_unsafe_z_unregister_declare(zn, msg.id, decl_type);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

// Parse session_interest list
_z_session_interest_rc_list_t *xs = intrs;
Expand All @@ -343,25 +343,25 @@ z_result_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration
}

void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
zn->_local_interests =
_z_session_interest_rc_list_drop_filter(zn->_local_interests, _z_session_interest_rc_eq, intr);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}

void _z_flush_interest(_z_session_t *zn) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_interest_rc_list_free(&zn->_local_interests);
_z_declare_data_list_free(&zn->_remote_declares);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}

z_result_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id) {
_z_interest_msg_t msg = {.type = _Z_INTEREST_MSG_TYPE_FINAL, .id = id};
// Retrieve interest
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);
_z_session_interest_rc_t *intr = __unsafe_z_get_interest_by_id(zn, id);
_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
if (intr == NULL) {
return _Z_RES_OK;
}
Expand Down
52 changes: 38 additions & 14 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ void _z_pending_query_clear(_z_pending_query_t *pen_qry) {

bool _z_pending_query_eq(const _z_pending_query_t *one, const _z_pending_query_t *two) { return one->_id == two->_id; }

static bool _z_pending_query_timeout(const _z_pending_query_t *foo, const _z_pending_query_t *pq) {
_ZP_UNUSED(foo);
bool result = z_clock_elapsed_ms((z_clock_t *)&pq->_start_time) >= pq->_timeout;
if (result) {
_Z_INFO("Dropping query because of timeout");
}
return result;
}

void _z_pending_query_process_timeout(_z_session_t *zn) {
// Lock session
_z_session_mutex_lock(zn);
// Drop all queries with timeout elapsed
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_timeout, NULL);
_z_session_mutex_unlock(zn);
}

/*------------------ Query ------------------*/
_z_zint_t _z_get_query_id(_z_session_t *zn) { return zn->_query_id++; }

Expand Down Expand Up @@ -66,11 +83,11 @@ _z_pending_query_t *__unsafe__z_get_pending_query_by_id(_z_session_t *zn, const
}

_z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t id) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, id);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
return pql;
}

Expand All @@ -80,7 +97,7 @@ z_result_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_q
_Z_DEBUG(">>> Allocating query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id,
(int)_z_string_len(&pen_qry->_key._suffix), _z_string_data(&pen_qry->_key._suffix));

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, pen_qry->_id);
if (pql == NULL) { // Register query only if a pending one with the same ID does not exist
Expand All @@ -89,7 +106,7 @@ z_result_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_q
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

return ret;
}
Expand All @@ -98,7 +115,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
_z_msg_put_t *msg, z_sample_kind_t kind) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand Down Expand Up @@ -162,7 +179,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
}
}

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

// Trigger the user callback
if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) {
Expand All @@ -183,7 +200,7 @@ z_result_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id,
z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err_t *msg) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand All @@ -193,7 +210,7 @@ z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err
// Build the reply
_z_reply_t reply = _z_reply_err_create(msg->_payload, &msg->_encoding);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

// Trigger the user callback
if (ret == _Z_RES_OK) {
Expand All @@ -213,7 +230,7 @@ z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err
z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

// Final reply received for unknown query id
_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
Expand All @@ -239,24 +256,31 @@ z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);
}

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);

return ret;
}

void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}

void _z_flush_pending_queries(_z_session_t *zn) {
_zp_session_lock_mutex(zn);
_z_session_mutex_lock(zn);

_z_pending_query_list_free(&zn->_pending_queries);

_zp_session_unlock_mutex(zn);
_z_session_mutex_unlock(zn);
}
#else

void _z_pending_query_process_timeout(_z_session_t *zn) {
_ZP_UNUSED(zn);
return;
}

#endif
Loading

0 comments on commit 98e02e4

Please sign in to comment.