Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change declare interest to new packet format #367

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@
#define _Z_FLAGS(h) (_Z_FLAGS_MASK & (h))
#define _Z_HAS_FLAG(h, f) (((h) & (f)) != 0)
#define _Z_SET_FLAG(h, f) (h |= f)
#define _Z_CLEAR_FLAG(h, f) (h &= ~(f))

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_CORE_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ _z_undecl_token_t _z_undecl_token_null(void);
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
Expand Down
4 changes: 2 additions & 2 deletions src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) {
}

int8_t _z_write_filter_create(_z_publisher_t *pub) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_CURRENT |
_Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)zp_malloc(sizeof(_z_writer_filter_ctx_t));

if (ctx == NULL) {
Expand Down
21 changes: 18 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,12 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key);
_z_declaration_t declaration;
if (sub->_zn.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
declaration = _z_make_undecl_subscriber(sub->_entity_id, NULL);
} else {
declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(&sub->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down Expand Up @@ -318,7 +323,12 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key);
_z_declaration_t declaration;
if (qle->_zn.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
declaration = _z_make_undecl_queryable(qle->_entity_id, NULL);
} else {
declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(&qle->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down Expand Up @@ -461,7 +471,12 @@ int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key);
_z_declaration_t declaration;
if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
declaration = _z_make_undecl_interest(sintr->in->val._id, NULL);
} else {
declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down
76 changes: 71 additions & 5 deletions src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,38 @@ int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl) {
}

int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl) {
// Set header
uint8_t header = _Z_DECL_INTEREST_MID;
_Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, false, decl->_id, decl->_keyexpr));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags));
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT)) {
_Z_SET_FLAG(header, _Z_INTEREST_FLAG_CURRENT);
}
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE)) {
_Z_SET_FLAG(header, _Z_INTEREST_FLAG_FUTURE);
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
// Set id
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, decl->_id));
// Copy flags but clear double use ones.
uint8_t interest_flags = decl->interest_flags;
_Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_CURRENT);
_Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_FUTURE);
// Process restricted flag
if (_Z_HAS_FLAG(interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) {
// Set Named & Mapping flags
_Bool has_kesuffix = _z_keyexpr_has_suffix(decl->_keyexpr);
if (has_kesuffix) {
_Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N);
}
if (_z_keyexpr_is_local(&decl->_keyexpr)) {
_Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M);
}
// Set decl flags & keyexpr
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags));
_Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &decl->_keyexpr));
} else {
// Set decl flags
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags));
}
return _Z_RES_OK;
}

Expand Down Expand Up @@ -337,11 +366,48 @@ int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t h
return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header);
}
int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) {
_Bool has_ext;
*decl = _z_decl_interest_null();
_Z_RETURN_IF_ERR(_z_decl_commons_decode(zbf, header, &has_ext, &decl->_id, &decl->_keyexpr));
// Decode id
_Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf));
// Decode interest flags
_Z_RETURN_IF_ERR(_z_uint8_decode(&decl->interest_flags, zbf));
if (has_ext) {
// Process restricted flag
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) {
uint16_t mapping = _Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M)
? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE
: _Z_KEYEXPR_MAPPING_LOCAL;
// Decode ke id
_Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_keyexpr._id, zbf));
// Decode ke suffix
if (_Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N)) {
_z_zint_t len;
_Z_RETURN_IF_ERR(_z_zint_decode(&len, zbf));
if (_z_zbuf_len(zbf) < len) {
return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED;
}
decl->_keyexpr._suffix = zp_malloc(len + 1);
if (decl->_keyexpr._suffix == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, true);
_z_zbuf_read_bytes(zbf, (uint8_t *)decl->_keyexpr._suffix, 0, len);
decl->_keyexpr._suffix[len] = 0;
} else {
decl->_keyexpr._suffix = NULL;
decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, false);
}
}
// Replace named & mapping by current & future flags
_Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M);
_Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N);
if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_CURRENT)) {
_Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT);
}
if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_FUTURE)) {
_Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE);
}
// Decode extention
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x13));
}
return _Z_RES_OK;
Expand Down
26 changes: 18 additions & 8 deletions src/protocol/definitions/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ _z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id,
._keyexpr = _z_keyexpr_steal(key),
._ext_subinfo = {._pull_mode = pull_mode, ._reliable = reliable}}}};
}

_z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_SUBSCRIBER,
._body = {._undecl_subscriber = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_SUBSCRIBER,
._body = {._undecl_subscriber = {
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete) {
return (_z_declaration_t){
._tag = _Z_DECL_QUERYABLE,
Expand All @@ -88,8 +92,10 @@ _z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id,
._ext_queryable_info = {._complete = complete, ._distance = distance}}}};
}
_z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_QUERYABLE,
._body = {._undecl_queryable = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_QUERYABLE,
._body = {._undecl_queryable = {
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}
_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) {
return (_z_declaration_t){._tag = _Z_DECL_TOKEN,
Expand All @@ -99,8 +105,10 @@ _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) {
}}};
}
_z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_TOKEN,
._body = {._undecl_token = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_TOKEN,
._body = {._undecl_token = {._id = id,
._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}
_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags) {
return (_z_declaration_t){._tag = _Z_DECL_INTEREST,
Expand All @@ -111,8 +119,10 @@ _z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, u
}}};
}
_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_INTEREST,
._body = {._undecl_interest = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_INTEREST,
._body = {._undecl_interest = {
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}
_z_declaration_t _z_make_final_interest(uint32_t id) {
return (_z_declaration_t){._tag = _Z_FINAL_INTEREST, ._body = {._final_interest = {._id = id}}};
Expand Down
22 changes: 11 additions & 11 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi
_z_session_interest_rc_list_t *xs = intrs;
while (xs != NULL) {
_z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs);
if ((intr->in->val._flags & flags) == 0) {
continue;
}
if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr));
if ((intr->in->val._flags & flags) != 0) {
if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr));
}
}
xs = _z_session_interest_rc_list_tail(xs);
}
Expand Down Expand Up @@ -291,28 +290,29 @@ int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) {
}

int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) {
// TODO process restricted flag & key
_ZP_UNUSED(key);
_ZP_UNUSED(id);
// Check transport type
if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
return _Z_RES_OK; // Nothing to do on unicast
}
// Current flags process
if ((flags & _Z_INTEREST_FLAG_CURRENT) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_CURRENT)) {
Fixed Show fixed Hide fixed
// Send all declare
if ((flags & _Z_INTEREST_FLAG_KEYEXPRS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_KEYEXPRS)) {
Fixed Show fixed Hide fixed
_Z_DEBUG("Sending declare resources");
_Z_RETURN_IF_ERR(_z_send_resource_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_SUBSCRIBERS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_SUBSCRIBERS)) {
Fixed Show fixed Hide fixed
_Z_DEBUG("Sending declare subscribers");
_Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_QUERYABLES) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_QUERYABLES)) {
Fixed Show fixed Hide fixed
_Z_DEBUG("Sending declare queryables");
_Z_RETURN_IF_ERR(_z_send_queryable_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_TOKENS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) {
Fixed Show fixed Hide fixed
// Zenoh pico doesn't support liveliness token for now
}
// Send final declare
Expand Down
49 changes: 13 additions & 36 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/net/memory.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERY == 1
Expand Down Expand Up @@ -103,15 +104,11 @@ _z_pending_query_t *__unsafe__z_get_pending_query_by_id(_z_session_t *zn, const
}

_z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t id) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, id);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
return pql;
}

Expand All @@ -121,9 +118,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
_Z_DEBUG(">>> Allocating query for (%ju:%s,%s)", (uintmax_t)pen_qry->_key._id, pen_qry->_key._suffix,
pen_qry->_parameters);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, pen_qry->_id);
if (pql == NULL) { // Register query only if a pending one with the same ID does not exist
Expand All @@ -132,9 +127,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}
Expand All @@ -144,9 +137,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
const _z_timestamp_t timestamp) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand Down Expand Up @@ -216,9 +207,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
}
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Trigger the user callback
if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) {
Expand All @@ -235,9 +224,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

// Final reply received for unknown query id
_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
Expand All @@ -261,34 +248,24 @@ int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}

void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

void _z_flush_pending_queries(_z_session_t *zn) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_list_free(&zn->_pending_queries);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}
#endif
Loading
Loading