diff --git a/src/session/interest.c b/src/session/interest.c index 57b56a48e..07c17953b 100644 --- a/src/session/interest.c +++ b/src/session/interest.c @@ -82,6 +82,95 @@ _z_session_interest_rc_t *__unsafe_z_get_interest_by_id(_z_session_t *zn, const _z_session_interest_rc_list_t *__unsafe_z_get_interest_by_key(_z_session_t *zn, const _z_keyexpr_t key) { _z_session_interest_rc_list_t *intrs = zn->_local_interests; return __z_get_interest_by_key(intrs, key); + +static int8_t _z_send_resource_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_resource_list_t *res_list = _z_resource_list_clone(zn->_local_resources); + _zp_session_unlock_mutex(zn); + _z_resource_list_t *xs = res_list; + while (xs != NULL) { + _z_resource_t *res = _z_resource_list_head(xs); + // Build the declare message to send on the wire + _z_keyexpr_t key = _z_keyexpr_alias(res->_key); + _z_declaration_t declaration = _z_make_decl_keyexpr(res->_id, &key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + xs = _z_resource_list_tail(xs); + } + _z_resource_list_free(&res_list); + return _Z_RES_OK; +} + +#if Z_FEATURE_SUBSCRIPTION == 1 +static int8_t _z_send_subscriber_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions); + _zp_session_unlock_mutex(zn); + _z_subscription_rc_list_t *xs = sub_list; + while (xs != NULL) { + _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); + // Build the declare message to send on the wire + _z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key); + _z_declaration_t declaration = + _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE, + sub->in->val._info.mode == Z_SUBMODE_PULL); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + xs = _z_subscription_rc_list_tail(xs); + } + _z_subscription_rc_list_free(&sub_list); + return _Z_RES_OK; +} +#else +static int8_t _z_send_subscriber_interest(_z_session_t *zn) { + _ZP_UNUSED(zn); + return _Z_RES_OK; +} +#endif + +#if Z_FEATURE_QUERYABLE == 1 +static int8_t _z_send_queryable_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_session_queryable_rc_list_t *qle_list = _z_session_queryable_rc_list_clone(zn->_local_queryable); + _zp_session_unlock_mutex(zn); + _z_session_queryable_rc_list_t *xs = qle_list; + while (xs != NULL) { + _z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs); + // Build the declare message to send on the wire + _z_keyexpr_t key = _z_keyexpr_alias(qle->in->val._key); + _z_declaration_t declaration = + _z_make_decl_queryable(&key, qle->in->val._id, qle->in->val._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + xs = _z_subscription_rc_list_tail(xs); + } + _z_session_queryable_rc_list_free(&qle_list); + return _Z_RES_OK; +} +#else +static int8_t _z_send_queryable_interest(_z_session_t *zn) { + _ZP_UNUSED(zn); + return _Z_RES_OK; +} +#endif + +static int8_t _z_interest_send_final_interest(_z_session_t *zn, uint32_t id) { + _z_declaration_t decl = _z_make_final_interest(id); + _z_network_message_t n_msg = _z_n_msg_make_declare(decl); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + return _Z_RES_OK; } _z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id) { @@ -190,96 +279,6 @@ int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { return _Z_RES_OK; } -static int8_t _z_send_resource_interest(_z_session_t *zn) { - _zp_session_lock_mutex(zn); - _z_resource_list_t *res_list = _z_resource_list_clone(zn->_local_resources); - _zp_session_unlock_mutex(zn); - _z_resource_list_t *xs = res_list; - while (xs != NULL) { - _z_resource_t *res = _z_resource_list_head(xs); - // Build the declare message to send on the wire - _z_keyexpr_t key = _z_keyexpr_alias(res->_key); - _z_declaration_t declaration = _z_make_decl_keyexpr(res->_id, &key); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - return _Z_ERR_TRANSPORT_TX_FAILED; - } - _z_n_msg_clear(&n_msg); - xs = _z_resource_list_tail(xs); - } - _z_resource_list_free(&res_list); - return _Z_RES_OK; -} - -#if Z_FEATURE_SUBSCRIPTION == 1 -static int8_t _z_send_subscriber_interest(_z_session_t *zn) { - _zp_session_lock_mutex(zn); - _z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions); - _zp_session_unlock_mutex(zn); - _z_subscription_rc_list_t *xs = sub_list; - while (xs != NULL) { - _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); - // Build the declare message to send on the wire - _z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key); - _z_declaration_t declaration = - _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE, - sub->in->val._info.mode == Z_SUBMODE_PULL); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - return _Z_ERR_TRANSPORT_TX_FAILED; - } - _z_n_msg_clear(&n_msg); - xs = _z_subscription_rc_list_tail(xs); - } - _z_subscription_rc_list_free(&sub_list); - return _Z_RES_OK; -} -#else -static int8_t _z_send_subscriber_interest(_z_session_t *zn) { - _ZP_UNUSED(zn); - return _Z_RES_OK; -} -#endif - -#if Z_FEATURE_QUERYABLE == 1 -static int8_t _z_send_queryable_interest(_z_session_t *zn) { - _zp_session_lock_mutex(zn); - _z_session_queryable_rc_list_t *qle_list = _z_session_queryable_rc_list_clone(zn->_local_queryable); - _zp_session_unlock_mutex(zn); - _z_session_queryable_rc_list_t *xs = qle_list; - while (xs != NULL) { - _z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs); - // Build the declare message to send on the wire - _z_keyexpr_t key = _z_keyexpr_alias(qle->in->val._key); - _z_declaration_t declaration = - _z_make_decl_queryable(&key, qle->in->val._id, qle->in->val._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); - _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - return _Z_ERR_TRANSPORT_TX_FAILED; - } - _z_n_msg_clear(&n_msg); - xs = _z_subscription_rc_list_tail(xs); - } - _z_session_queryable_rc_list_free(&qle_list); - return _Z_RES_OK; -} -#else -static int8_t _z_send_queryable_interest(_z_session_t *zn) { - _ZP_UNUSED(zn); - return _Z_RES_OK; -} -#endif - -static int8_t _z_interest_send_final_interest(_z_session_t *zn, uint32_t id) { - _z_declaration_t decl = _z_make_final_interest(id); - _z_network_message_t n_msg = _z_n_msg_make_declare(decl); - if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - return _Z_ERR_TRANSPORT_TX_FAILED; - } - _z_n_msg_clear(&n_msg); - return _Z_RES_OK; -} - int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { _ZP_UNUSED(key); _ZP_UNUSED(id);