Skip to content

Commit

Permalink
Change declare interest to new packet format (#367)
Browse files Browse the repository at this point in the history
* feat: new declare interest packet format

* refactor: use flag macros

* fix: encode correct flag value

* fix: continue create infinite loop

* refactor: go through session_lock function

* feat: remove ke from unicast undeclares
  • Loading branch information
jean-roland authored Mar 18, 2024
1 parent 7e74400 commit 7fbce2f
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 173 deletions.
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/definitions/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,6 @@
#define _Z_FLAGS(h) (_Z_FLAGS_MASK & (h))
#define _Z_HAS_FLAG(h, f) (((h) & (f)) != 0)
#define _Z_SET_FLAG(h, f) (h |= f)
#define _Z_CLEAR_FLAG(h, f) (h &= ~(f))

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_CORE_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ _z_undecl_token_t _z_undecl_token_null(void);
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
Expand Down
4 changes: 2 additions & 2 deletions src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) {
}

int8_t _z_write_filter_create(_z_publisher_t *pub) {
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_CURRENT |
_Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE;
_z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)zp_malloc(sizeof(_z_writer_filter_ctx_t));

if (ctx == NULL) {
Expand Down
21 changes: 18 additions & 3 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,12 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key);
_z_declaration_t declaration;
if (sub->_zn.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
declaration = _z_make_undecl_subscriber(sub->_entity_id, NULL);
} else {
declaration = _z_make_undecl_subscriber(sub->_entity_id, &s->in->val._key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(&sub->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down Expand Up @@ -318,7 +323,12 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key);
_z_declaration_t declaration;
if (qle->_zn.in->val._tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
declaration = _z_make_undecl_queryable(qle->_entity_id, NULL);
} else {
declaration = _z_make_undecl_queryable(qle->_entity_id, &q->in->val._key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(&qle->_zn.in->val, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down Expand Up @@ -461,7 +471,12 @@ int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id) {
return _Z_ERR_ENTITY_UNKNOWN;
}
// Build the declare message to send on the wire
_z_declaration_t declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key);
_z_declaration_t declaration;
if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
declaration = _z_make_undecl_interest(sintr->in->val._id, NULL);
} else {
declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key);
}
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
return _Z_ERR_TRANSPORT_TX_FAILED;
Expand Down
76 changes: 71 additions & 5 deletions src/protocol/codec/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,38 @@ int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl) {
}

int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl) {
// Set header
uint8_t header = _Z_DECL_INTEREST_MID;
_Z_RETURN_IF_ERR(_z_decl_commons_encode(wbf, header, false, decl->_id, decl->_keyexpr));
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, decl->interest_flags));
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT)) {
_Z_SET_FLAG(header, _Z_INTEREST_FLAG_CURRENT);
}
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE)) {
_Z_SET_FLAG(header, _Z_INTEREST_FLAG_FUTURE);
}
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, header));
// Set id
_Z_RETURN_IF_ERR(_z_zint_encode(wbf, decl->_id));
// Copy flags but clear double use ones.
uint8_t interest_flags = decl->interest_flags;
_Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_CURRENT);
_Z_CLEAR_FLAG(interest_flags, _Z_INTEREST_FLAG_FUTURE);
// Process restricted flag
if (_Z_HAS_FLAG(interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) {
// Set Named & Mapping flags
_Bool has_kesuffix = _z_keyexpr_has_suffix(decl->_keyexpr);
if (has_kesuffix) {
_Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N);
}
if (_z_keyexpr_is_local(&decl->_keyexpr)) {
_Z_SET_FLAG(interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M);
}
// Set decl flags & keyexpr
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags));
_Z_RETURN_IF_ERR(_z_keyexpr_encode(wbf, has_kesuffix, &decl->_keyexpr));
} else {
// Set decl flags
_Z_RETURN_IF_ERR(_z_uint8_encode(wbf, interest_flags));
}
return _Z_RES_OK;
}

Expand Down Expand Up @@ -337,11 +366,48 @@ int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t h
return _z_undecl_trivial_decode(zbf, &decl->_ext_keyexpr, &decl->_id, header);
}
int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header) {
_Bool has_ext;
*decl = _z_decl_interest_null();
_Z_RETURN_IF_ERR(_z_decl_commons_decode(zbf, header, &has_ext, &decl->_id, &decl->_keyexpr));
// Decode id
_Z_RETURN_IF_ERR(_z_zint32_decode(&decl->_id, zbf));
// Decode interest flags
_Z_RETURN_IF_ERR(_z_uint8_decode(&decl->interest_flags, zbf));
if (has_ext) {
// Process restricted flag
if (_Z_HAS_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_RESTRICTED)) {
uint16_t mapping = _Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M)
? _Z_KEYEXPR_MAPPING_UNKNOWN_REMOTE
: _Z_KEYEXPR_MAPPING_LOCAL;
// Decode ke id
_Z_RETURN_IF_ERR(_z_zint16_decode(&decl->_keyexpr._id, zbf));
// Decode ke suffix
if (_Z_HAS_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N)) {
_z_zint_t len;
_Z_RETURN_IF_ERR(_z_zint_decode(&len, zbf));
if (_z_zbuf_len(zbf) < len) {
return _Z_ERR_MESSAGE_DESERIALIZATION_FAILED;
}
decl->_keyexpr._suffix = zp_malloc(len + 1);
if (decl->_keyexpr._suffix == NULL) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, true);
_z_zbuf_read_bytes(zbf, (uint8_t *)decl->_keyexpr._suffix, 0, len);
decl->_keyexpr._suffix[len] = 0;
} else {
decl->_keyexpr._suffix = NULL;
decl->_keyexpr._mapping = _z_keyexpr_mapping(mapping, false);
}
}
// Replace named & mapping by current & future flags
_Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_M);
_Z_CLEAR_FLAG(decl->interest_flags, _Z_DECL_SUBSCRIBER_FLAG_N);
if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_CURRENT)) {
_Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_CURRENT);
}
if (_Z_HAS_FLAG(header, _Z_INTEREST_FLAG_FUTURE)) {
_Z_SET_FLAG(decl->interest_flags, _Z_INTEREST_FLAG_FUTURE);
}
// Decode extention
if (_Z_HAS_FLAG(header, _Z_FLAG_Z_Z)) {
_Z_RETURN_IF_ERR(_z_msg_ext_skip_non_mandatories(zbf, 0x13));
}
return _Z_RES_OK;
Expand Down
26 changes: 18 additions & 8 deletions src/protocol/definitions/declarations.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ _z_declaration_t _z_make_decl_subscriber(_Z_MOVE(_z_keyexpr_t) key, uint32_t id,
._keyexpr = _z_keyexpr_steal(key),
._ext_subinfo = {._pull_mode = pull_mode, ._reliable = reliable}}}};
}

_z_declaration_t _z_make_undecl_subscriber(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_SUBSCRIBER,
._body = {._undecl_subscriber = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_SUBSCRIBER,
._body = {._undecl_subscriber = {
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}

_z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint32_t distance, uint8_t complete) {
return (_z_declaration_t){
._tag = _Z_DECL_QUERYABLE,
Expand All @@ -88,8 +92,10 @@ _z_declaration_t _z_make_decl_queryable(_Z_MOVE(_z_keyexpr_t) key, uint32_t id,
._ext_queryable_info = {._complete = complete, ._distance = distance}}}};
}
_z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_QUERYABLE,
._body = {._undecl_queryable = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_QUERYABLE,
._body = {._undecl_queryable = {
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}
_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) {
return (_z_declaration_t){._tag = _Z_DECL_TOKEN,
Expand All @@ -99,8 +105,10 @@ _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) {
}}};
}
_z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_TOKEN,
._body = {._undecl_token = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_TOKEN,
._body = {._undecl_token = {._id = id,
._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}
_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags) {
return (_z_declaration_t){._tag = _Z_DECL_INTEREST,
Expand All @@ -111,8 +119,10 @@ _z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, u
}}};
}
_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) {
return (_z_declaration_t){._tag = _Z_UNDECL_INTEREST,
._body = {._undecl_interest = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}};
return (_z_declaration_t){
._tag = _Z_UNDECL_INTEREST,
._body = {._undecl_interest = {
._id = id, ._ext_keyexpr = (key == NULL) ? _z_keyexpr_null() : _z_keyexpr_duplicate(*key)}}};
}
_z_declaration_t _z_make_final_interest(uint32_t id) {
return (_z_declaration_t){._tag = _Z_FINAL_INTEREST, ._body = {._final_interest = {._id = id}}};
Expand Down
22 changes: 11 additions & 11 deletions src/session/interest.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,11 @@ static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_sessi
_z_session_interest_rc_list_t *xs = intrs;
while (xs != NULL) {
_z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs);
if ((intr->in->val._flags & flags) == 0) {
continue;
}
if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr));
if ((intr->in->val._flags & flags) != 0) {
if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix,
strlen(key._suffix)) == true) {
ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr));
}
}
xs = _z_session_interest_rc_list_tail(xs);
}
Expand Down Expand Up @@ -291,28 +290,29 @@ int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) {
}

int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) {
// TODO process restricted flag & key
_ZP_UNUSED(key);
_ZP_UNUSED(id);
// Check transport type
if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) {
return _Z_RES_OK; // Nothing to do on unicast
}
// Current flags process
if ((flags & _Z_INTEREST_FLAG_CURRENT) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_CURRENT)) {
// Send all declare
if ((flags & _Z_INTEREST_FLAG_KEYEXPRS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_KEYEXPRS)) {
_Z_DEBUG("Sending declare resources");
_Z_RETURN_IF_ERR(_z_send_resource_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_SUBSCRIBERS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_SUBSCRIBERS)) {
_Z_DEBUG("Sending declare subscribers");
_Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_QUERYABLES) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_QUERYABLES)) {
_Z_DEBUG("Sending declare queryables");
_Z_RETURN_IF_ERR(_z_send_queryable_interest(zn));
}
if ((flags & _Z_INTEREST_FLAG_TOKENS) != 0) {
if (_Z_HAS_FLAG(flags, _Z_INTEREST_FLAG_TOKENS)) {
// Zenoh pico doesn't support liveliness token for now
}
// Send final declare
Expand Down
49 changes: 13 additions & 36 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "zenoh-pico/net/memory.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/logging.h"

#if Z_FEATURE_QUERY == 1
Expand Down Expand Up @@ -103,15 +104,11 @@ _z_pending_query_t *__unsafe__z_get_pending_query_by_id(_z_session_t *zn, const
}

_z_pending_query_t *_z_get_pending_query_by_id(_z_session_t *zn, const _z_zint_t id) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, id);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
return pql;
}

Expand All @@ -121,9 +118,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
_Z_DEBUG(">>> Allocating query for (%ju:%s,%s)", (uintmax_t)pen_qry->_key._id, pen_qry->_key._suffix,
pen_qry->_parameters);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pql = __unsafe__z_get_pending_query_by_id(zn, pen_qry->_id);
if (pql == NULL) { // Register query only if a pending one with the same ID does not exist
Expand All @@ -132,9 +127,7 @@ int8_t _z_register_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry)
ret = _Z_ERR_ENTITY_DECLARATION_FAILED;
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}
Expand All @@ -144,9 +137,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
const _z_timestamp_t timestamp) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
if ((ret == _Z_RES_OK) && (pen_qry == NULL)) {
Expand Down Expand Up @@ -216,9 +207,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
}
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

// Trigger the user callback
if ((ret == _Z_RES_OK) && (pen_qry->_consolidation != Z_CONSOLIDATION_MODE_LATEST)) {
Expand All @@ -235,9 +224,7 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, const _z_zint_t id, cons
int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
int8_t ret = _Z_RES_OK;

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

// Final reply received for unknown query id
_z_pending_query_t *pen_qry = __unsafe__z_get_pending_query_by_id(zn, id);
Expand All @@ -261,34 +248,24 @@ int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {
zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);
}

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);

return ret;
}

void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pen_qry) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

zn->_pending_queries = _z_pending_query_list_drop_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}

void _z_flush_pending_queries(_z_session_t *zn) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_lock_mutex(zn);

_z_pending_query_list_free(&zn->_pending_queries);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&zn->_mutex_inner);
#endif // Z_FEATURE_MULTI_THREAD == 1
_zp_session_unlock_mutex(zn);
}
#endif
Loading

0 comments on commit 7fbce2f

Please sign in to comment.