diff --git a/include/zenoh-pico/api/liveliness.h b/include/zenoh-pico/api/liveliness.h index a4d6c8c9e..15c859fa6 100644 --- a/include/zenoh-pico/api/liveliness.h +++ b/include/zenoh-pico/api/liveliness.h @@ -87,7 +87,7 @@ z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token); * The options for `z_liveliness_declare_subscriber()` */ typedef struct z_liveliness_subscriber_options_t { - uint8_t __dummy; + bool history; } z_liveliness_subscriber_options_t; /** diff --git a/include/zenoh-pico/protocol/keyexpr.h b/include/zenoh-pico/protocol/keyexpr.h index 580c9e63e..8d790f669 100644 --- a/include/zenoh-pico/protocol/keyexpr.h +++ b/include/zenoh-pico/protocol/keyexpr.h @@ -32,6 +32,7 @@ bool _z_keyexpr_suffix_equals(const _z_keyexpr_t *left, const _z_keyexpr_t *righ /*------------------ clone/Copy/Free helpers ------------------*/ _z_keyexpr_t _z_keyexpr_from_string(uint16_t rid, _z_string_t *str); _z_keyexpr_t _z_keyexpr_from_substr(uint16_t rid, const char *str, size_t len); +size_t _z_keyexpr_size(_z_keyexpr_t *p); z_result_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src); _z_keyexpr_t _z_keyexpr_duplicate(_z_keyexpr_t src); _z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src); diff --git a/include/zenoh-pico/session/liveliness.h b/include/zenoh-pico/session/liveliness.h index 55bbeb2b6..3f375f70a 100644 --- a/include/zenoh-pico/session/liveliness.h +++ b/include/zenoh-pico/session/liveliness.h @@ -48,6 +48,7 @@ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id); z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, uint32_t id, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp); z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, const _z_timestamp_t *timestamp); +z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, _z_keyexpr_t keyexpr); #endif #if Z_FEATURE_QUERY == 1 diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index d096daf94..3c1d50dbe 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -50,11 +50,12 @@ bool _z_resource_eq(const _z_resource_t *one, const _z_resource_t *two); void _z_resource_clear(_z_resource_t *res); void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src); void _z_resource_free(_z_resource_t **res); +size_t _z_resource_size(_z_resource_t *p); -_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy) +_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_resource_size, _z_resource_clear, _z_resource_copy) _Z_LIST_DEFINE(_z_resource, _z_resource_t) -_Z_ELEM_DEFINE(_z_keyexpr, _z_keyexpr_t, _z_noop_size, _z_keyexpr_clear, _z_keyexpr_copy) +_Z_ELEM_DEFINE(_z_keyexpr, _z_keyexpr_t, _z_keyexpr_size, _z_keyexpr_clear, _z_keyexpr_copy) _Z_INT_MAP_DEFINE(_z_keyexpr, _z_keyexpr_t) // Forward declaration to avoid cyclical include diff --git a/src/api/liveliness.c b/src/api/liveliness.c index 4d58ea0fa..4c671e070 100644 --- a/src/api/liveliness.c +++ b/src/api/liveliness.c @@ -71,17 +71,30 @@ z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token) { #if Z_FEATURE_SUBSCRIPTION == 1 z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_options_t *options) { - options->__dummy = 0; + options->history = false; return _Z_RES_OK; } z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub, const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback, z_liveliness_subscriber_options_t *options) { - _ZP_UNUSED(options); void *ctx = callback->_this._val.context; callback->_this._val.context = NULL; + z_liveliness_subscriber_options_t opt; + if (options == NULL) { + z_liveliness_subscriber_options_default(&opt); + } else { + opt = *options; + } + + if (opt.history) { + z_result_t ret = _z_liveliness_subscription_trigger_history(_Z_RC_IN_VAL(zs), *keyexpr); + if (ret != _Z_RES_OK) { + return ret; + } + } + _z_keyexpr_t key = _z_update_keyexpr_to_declared(_Z_RC_IN_VAL(zs), *keyexpr); _z_subscriber_t int_sub = diff --git a/src/collections/intmap.c b/src/collections/intmap.c index e7aee7993..5035cb74d 100644 --- a/src/collections/intmap.c +++ b/src/collections/intmap.c @@ -68,6 +68,9 @@ z_result_t _z_int_void_map_copy(_z_int_void_map_t *dst, const _z_int_void_map_t _z_int_void_map_t _z_int_void_map_clone(const _z_int_void_map_t *src, z_element_clone_f f_c, z_element_free_f f_f) { _z_int_void_map_t dst = {._capacity = src->_capacity, ._vals = NULL}; + if (src->_vals == NULL) { + return dst; + } // Lazily allocate and initialize to NULL all the pointers size_t len = dst._capacity * sizeof(_z_list_t *); dst._vals = (_z_list_t **)z_malloc(len); @@ -153,6 +156,9 @@ _z_int_void_map_iterator_t _z_int_void_map_iterator_make(const _z_int_void_map_t } bool _z_int_void_map_iterator_next(_z_int_void_map_iterator_t *iter) { + if (iter->_map->_vals == NULL) { + return false; + } while (iter->_idx < iter->_map->_capacity) { if (iter->_list_ptr == NULL) { iter->_list_ptr = iter->_map->_vals[iter->_idx]; diff --git a/src/net/liveliness.c b/src/net/liveliness.c index 2567c44ae..02ea4f3e7 100644 --- a/src/net/liveliness.c +++ b/src/net/liveliness.c @@ -30,17 +30,16 @@ z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_ z_result_t ret; uint32_t id = _z_get_entity_id(_Z_RC_IN_VAL(zn)); - _z_keyexpr_t key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr); - _z_declaration_t declaration = _z_make_decl_token(&key, id); + _z_declaration_t declaration = _z_make_decl_token(&keyexpr, id); _z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0); ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK); _z_n_msg_clear(&n_msg); - _z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, key); + _z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, keyexpr); ret_token->_id = id; - _z_keyexpr_move(&ret_token->_key, &key); + _z_keyexpr_move(&ret_token->_key, &keyexpr); ret_token->_zn = _z_session_rc_clone_as_weak(zn); return ret; } diff --git a/src/protocol/keyexpr.c b/src/protocol/keyexpr.c index 720ad0539..e27180f63 100644 --- a/src/protocol/keyexpr.c +++ b/src/protocol/keyexpr.c @@ -48,6 +48,11 @@ _z_keyexpr_t _z_keyexpr_from_substr(uint16_t rid, const char *str, size_t len) { }; } +size_t _z_keyexpr_size(_z_keyexpr_t *p) { + _ZP_UNUSED(p); + return sizeof(_z_keyexpr_t); +} + z_result_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src) { *dst = _z_keyexpr_null(); dst->_id = src->_id; @@ -79,11 +84,7 @@ _z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src) { return stolen; } -void _z_keyexpr_move(_z_keyexpr_t *dst, _z_keyexpr_t *src) { - dst->_id = src->_id; - dst->_mapping = src->_mapping; - _z_string_move(&dst->_suffix, &src->_suffix); -} +void _z_keyexpr_move(_z_keyexpr_t *dst, _z_keyexpr_t *src) { *dst = _z_keyexpr_steal(src); } void _z_keyexpr_clear(_z_keyexpr_t *rk) { rk->_id = 0; diff --git a/src/session/liveliness.c b/src/session/liveliness.c index b6bda9650..8d489dfc8 100644 --- a/src/session/liveliness.c +++ b/src/session/liveliness.c @@ -36,6 +36,9 @@ z_result_t _z_liveliness_register_token(_z_session_t *zn, uint32_t id, const _z_keyexpr_t keyexpr) { z_result_t ret = _Z_RES_OK; + _Z_DEBUG("Register liveliness token (%i:%.*s)", (int)id, (int)_z_string_len(&keyexpr._suffix), + _z_string_data(&keyexpr._suffix)); + _zp_session_lock_mutex(zn); const _z_keyexpr_t *pkeyexpr = _z_keyexpr_intmap_get(&zn->_local_tokens, id); @@ -54,6 +57,8 @@ z_result_t _z_liveliness_register_token(_z_session_t *zn, uint32_t id, const _z_ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id) { _zp_session_lock_mutex(zn); + _Z_DEBUG("Unregister liveliness token (%i)", (int)id); + _z_keyexpr_intmap_remove(&zn->_local_tokens, id); _zp_session_unlock_mutex(zn); @@ -65,7 +70,7 @@ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id) { z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, uint32_t id, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp) { z_result_t ret = _Z_RES_OK; - // TODO(sashacmc): What about history? Currently tokens decalred before subscription started, not processed + ret = _z_trigger_liveliness_subscriptions_declare(zn, keyexpr, timestamp); if (ret == _Z_RES_OK) { _zp_session_lock_mutex(zn); @@ -107,6 +112,28 @@ z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, c return ret; } +z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, _z_keyexpr_t keyexpr) { + z_result_t ret = _Z_RES_OK; + + _zp_session_lock_mutex(zn); + _z_keyexpr_intmap_t token_list = _z_keyexpr_intmap_clone(&zn->_remote_tokens); + _zp_session_unlock_mutex(zn); + + _z_keyexpr_intmap_iterator_t iter = _z_keyexpr_intmap_iterator_make(&token_list); + _z_timestamp_t tm = _z_timestamp_null(); + while (_z_keyexpr_intmap_iterator_next(&iter)) { + _z_keyexpr_t key = *_z_keyexpr_intmap_iterator_value(&iter); + if (_z_keyexpr_suffix_intersects(&key, &keyexpr)) { + ret = _z_trigger_liveliness_subscriptions_declare(zn, key, &tm); + if (ret != _Z_RES_OK) { + break; + } + } + } + _z_keyexpr_intmap_clear(&token_list); + + return ret; +} #endif // Z_FEATURE_SUBSCRIPTION == 1 /**************** Liveliness Query ****************/ @@ -140,7 +167,7 @@ uint32_t _z_liveliness_get_query_id(_z_session_t *zn) { return zn->_liveliness_q z_result_t _z_liveliness_register_pending_query(_z_session_t *zn, uint32_t id, _z_liveliness_pending_query_t *pen_qry) { z_result_t ret = _Z_RES_OK; - _Z_DEBUG(">>> Allocating liveliness query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id, + _Z_DEBUG("Register liveliness query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id, (int)_z_string_len(&pen_qry->_key._suffix), _z_string_data(&pen_qry->_key._suffix)); _zp_session_lock_mutex(zn); @@ -181,7 +208,7 @@ z_result_t _z_liveliness_pending_query_reply(_z_session_t *zn, uint32_t interest _Z_DEBUG("Reply liveliness query for %d - %.*s", expanded_ke._id, (int)_z_string_len(&expanded_ke._suffix), _z_string_data(&expanded_ke._suffix)); - if (_z_keyexpr_suffix_intersects(&pq->_key, &expanded_ke) == false) { + if (!_z_keyexpr_suffix_intersects(&pq->_key, &expanded_ke)) { ret = _Z_ERR_QUERY_NOT_MATCH; } diff --git a/src/session/resource.c b/src/session/resource.c index b573e600a..ec8f90703 100644 --- a/src/session/resource.c +++ b/src/session/resource.c @@ -31,6 +31,11 @@ bool _z_resource_eq(const _z_resource_t *other, const _z_resource_t *this_) { re void _z_resource_clear(_z_resource_t *res) { _z_keyexpr_clear(&res->_key); } +size_t _z_resource_size(_z_resource_t *p) { + _ZP_UNUSED(p); + return sizeof(_z_resource_t); +} + void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src) { _z_keyexpr_copy(&dst->_key, &src->_key); dst->_id = src->_id; diff --git a/tests/z_api_liveliness_test.c b/tests/z_api_liveliness_test.c index 8c31d315f..134d4b407 100644 --- a/tests/z_api_liveliness_test.c +++ b/tests/z_api_liveliness_test.c @@ -70,7 +70,7 @@ void on_receive(z_loaned_sample_t* s, void* context) { } } -void test_liveliness_sub(void) { +void test_liveliness_sub(bool multicast, bool history) { const char* expr = "zenoh-pico/liveliness/test/*"; z_owned_session_t s1, s2; @@ -82,6 +82,14 @@ void test_liveliness_sub(void) { z_view_keyexpr_from_str(&k1, token1_expr); z_view_keyexpr_from_str(&k2, token2_expr); + if (multicast) { + zp_config_insert(z_loan_mut(c1), Z_CONFIG_MODE_KEY, "client"); + zp_config_insert(z_loan_mut(c1), Z_CONFIG_LISTEN_KEY, "udp/224.0.0.224:7447#iface=lo"); + + zp_config_insert(z_loan_mut(c2), Z_CONFIG_MODE_KEY, "peer"); + zp_config_insert(z_loan_mut(c2), Z_CONFIG_LISTEN_KEY, "udp/224.0.0.224:7447#iface=lo"); + } + assert_ok(z_open(&s1, z_config_move(&c1), NULL)); assert_ok(z_open(&s2, z_config_move(&c2), NULL)); @@ -90,18 +98,30 @@ void test_liveliness_sub(void) { assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL)); assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL)); + z_owned_liveliness_token_t t1, t2; + // In history mode we can declare token before subsribing + if (history) { + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL)); + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL)); + } + + z_sleep_s(1); z_owned_closure_sample_t closure; context_t context = {false, false, false, false}; z_closure_sample(&closure, on_receive, NULL, (void*)(&context)); z_owned_subscriber_t sub; + z_liveliness_subscriber_options_t sub_opt; + z_liveliness_subscriber_options_default(&sub_opt); + sub_opt.history = history; assert_ok(z_liveliness_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k), - z_closure_sample_move(&closure), NULL)); + z_closure_sample_move(&closure), &sub_opt)); z_sleep_s(1); - z_owned_liveliness_token_t t1, t2; - assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL)); - assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL)); + if (!history) { + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL)); + assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL)); + } z_sleep_s(1); @@ -191,7 +211,8 @@ void test_liveliness_get(void) { int main(int argc, char** argv) { (void)argc; (void)argv; - test_liveliness_sub(); + test_liveliness_sub(false, false); + test_liveliness_sub(false, true); test_liveliness_get(); } diff --git a/tests/z_collections_test.c b/tests/z_collections_test.c index f7118aaa1..8bb54007f 100644 --- a/tests/z_collections_test.c +++ b/tests/z_collections_test.c @@ -318,25 +318,35 @@ void int_map_iterator_test(void) { _z_str_intmap_insert(&map, 30, "C"); _z_str_intmap_insert(&map, 40, "D"); - _z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map); - - assert(_z_str_intmap_iterator_next(&iter)); - assert(_z_str_intmap_iterator_key(&iter) == 20); - assert(strcmp(_z_str_intmap_iterator_value(&iter), "B") == 0); - - assert(_z_str_intmap_iterator_next(&iter)); - assert(_z_str_intmap_iterator_key(&iter) == 40); - assert(strcmp(_z_str_intmap_iterator_value(&iter), "D") == 0); - - assert(_z_str_intmap_iterator_next(&iter)); - assert(_z_str_intmap_iterator_key(&iter) == 10); - assert(strcmp(_z_str_intmap_iterator_value(&iter), "A") == 0); - - assert(_z_str_intmap_iterator_next(&iter)); - assert(_z_str_intmap_iterator_key(&iter) == 30); - assert(strcmp(_z_str_intmap_iterator_value(&iter), "C") == 0); - - assert(!_z_str_intmap_iterator_next(&iter)); +#define TEST_MAP(map) \ + { \ + _z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map); \ + assert(_z_str_intmap_iterator_next(&iter)); \ + assert(_z_str_intmap_iterator_key(&iter) == 20); \ + assert(strcmp(_z_str_intmap_iterator_value(&iter), "B") == 0); \ + \ + assert(_z_str_intmap_iterator_next(&iter)); \ + assert(_z_str_intmap_iterator_key(&iter) == 40); \ + assert(strcmp(_z_str_intmap_iterator_value(&iter), "D") == 0); \ + \ + assert(_z_str_intmap_iterator_next(&iter)); \ + assert(_z_str_intmap_iterator_key(&iter) == 10); \ + assert(strcmp(_z_str_intmap_iterator_value(&iter), "A") == 0); \ + \ + assert(_z_str_intmap_iterator_next(&iter)); \ + assert(_z_str_intmap_iterator_key(&iter) == 30); \ + assert(strcmp(_z_str_intmap_iterator_value(&iter), "C") == 0); \ + \ + assert(!_z_str_intmap_iterator_next(&iter)); \ + } + + TEST_MAP(map); + + _z_str_intmap_t map2 = _z_str_intmap_clone(&map); + + TEST_MAP(map2); + +#undef TEST_MAP } int main(void) {