Skip to content

Commit

Permalink
Liveness history support, bug fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 6, 2024
1 parent 56d862a commit 4580eb0
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 42 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/liveliness.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token);
* The options for `z_liveliness_declare_subscriber()`
*/
typedef struct z_liveliness_subscriber_options_t {
uint8_t __dummy;
bool history;
} z_liveliness_subscriber_options_t;

/**
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/protocol/keyexpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ bool _z_keyexpr_suffix_equals(const _z_keyexpr_t *left, const _z_keyexpr_t *righ
/*------------------ clone/Copy/Free helpers ------------------*/
_z_keyexpr_t _z_keyexpr_from_string(uint16_t rid, _z_string_t *str);
_z_keyexpr_t _z_keyexpr_from_substr(uint16_t rid, const char *str, size_t len);
size_t _z_keyexpr_size(_z_keyexpr_t *p);
z_result_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src);
_z_keyexpr_t _z_keyexpr_duplicate(_z_keyexpr_t src);
_z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src);
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/session/liveliness.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id);
z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, uint32_t id, const _z_keyexpr_t keyexpr,
const _z_timestamp_t *timestamp);
z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, const _z_timestamp_t *timestamp);
z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, _z_keyexpr_t keyexpr);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
5 changes: 3 additions & 2 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ bool _z_resource_eq(const _z_resource_t *one, const _z_resource_t *two);
void _z_resource_clear(_z_resource_t *res);
void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src);
void _z_resource_free(_z_resource_t **res);
size_t _z_resource_size(_z_resource_t *p);

_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy)
_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_resource_size, _z_resource_clear, _z_resource_copy)
_Z_LIST_DEFINE(_z_resource, _z_resource_t)

_Z_ELEM_DEFINE(_z_keyexpr, _z_keyexpr_t, _z_noop_size, _z_keyexpr_clear, _z_keyexpr_copy)
_Z_ELEM_DEFINE(_z_keyexpr, _z_keyexpr_t, _z_keyexpr_size, _z_keyexpr_clear, _z_keyexpr_copy)
_Z_INT_MAP_DEFINE(_z_keyexpr, _z_keyexpr_t)

// Forward declaration to avoid cyclical include
Expand Down
17 changes: 15 additions & 2 deletions src/api/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,30 @@ z_result_t z_liveliness_undeclare_token(z_moved_liveliness_token_t *token) {

#if Z_FEATURE_SUBSCRIPTION == 1
z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_options_t *options) {
options->__dummy = 0;
options->history = false;
return _Z_RES_OK;
}

z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub,
const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback,
z_liveliness_subscriber_options_t *options) {
_ZP_UNUSED(options);
void *ctx = callback->_this._val.context;
callback->_this._val.context = NULL;

z_liveliness_subscriber_options_t opt;
if (options == NULL) {
z_liveliness_subscriber_options_default(&opt);
} else {
opt = *options;
}

if (opt.history) {
z_result_t ret = _z_liveliness_subscription_trigger_history(_Z_RC_IN_VAL(zs), *keyexpr);
if (ret != _Z_RES_OK) {
return ret;
}
}

_z_keyexpr_t key = _z_update_keyexpr_to_declared(_Z_RC_IN_VAL(zs), *keyexpr);

_z_subscriber_t int_sub =
Expand Down
6 changes: 6 additions & 0 deletions src/collections/intmap.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ z_result_t _z_int_void_map_copy(_z_int_void_map_t *dst, const _z_int_void_map_t

_z_int_void_map_t _z_int_void_map_clone(const _z_int_void_map_t *src, z_element_clone_f f_c, z_element_free_f f_f) {
_z_int_void_map_t dst = {._capacity = src->_capacity, ._vals = NULL};
if (src->_vals == NULL) {
return dst;
}
// Lazily allocate and initialize to NULL all the pointers
size_t len = dst._capacity * sizeof(_z_list_t *);
dst._vals = (_z_list_t **)z_malloc(len);
Expand Down Expand Up @@ -153,6 +156,9 @@ _z_int_void_map_iterator_t _z_int_void_map_iterator_make(const _z_int_void_map_t
}

bool _z_int_void_map_iterator_next(_z_int_void_map_iterator_t *iter) {
if (iter->_map->_vals == NULL) {
return false;
}
while (iter->_idx < iter->_map->_capacity) {
if (iter->_list_ptr == NULL) {
iter->_list_ptr = iter->_map->_vals[iter->_idx];
Expand Down
7 changes: 3 additions & 4 deletions src/net/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,16 @@ z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_
z_result_t ret;

uint32_t id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
_z_keyexpr_t key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr);

_z_declaration_t declaration = _z_make_decl_token(&key, id);
_z_declaration_t declaration = _z_make_decl_token(&keyexpr, id);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
_z_n_msg_clear(&n_msg);

_z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, key);
_z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, keyexpr);

ret_token->_id = id;
_z_keyexpr_move(&ret_token->_key, &key);
_z_keyexpr_move(&ret_token->_key, &keyexpr);
ret_token->_zn = _z_session_rc_clone_as_weak(zn);
return ret;
}
Expand Down
11 changes: 6 additions & 5 deletions src/protocol/keyexpr.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ _z_keyexpr_t _z_keyexpr_from_substr(uint16_t rid, const char *str, size_t len) {
};
}

size_t _z_keyexpr_size(_z_keyexpr_t *p) {
_ZP_UNUSED(p);
return sizeof(_z_keyexpr_t);
}

z_result_t _z_keyexpr_copy(_z_keyexpr_t *dst, const _z_keyexpr_t *src) {
*dst = _z_keyexpr_null();
dst->_id = src->_id;
Expand Down Expand Up @@ -79,11 +84,7 @@ _z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src) {
return stolen;
}

void _z_keyexpr_move(_z_keyexpr_t *dst, _z_keyexpr_t *src) {
dst->_id = src->_id;
dst->_mapping = src->_mapping;
_z_string_move(&dst->_suffix, &src->_suffix);
}
void _z_keyexpr_move(_z_keyexpr_t *dst, _z_keyexpr_t *src) { *dst = _z_keyexpr_steal(src); }

void _z_keyexpr_clear(_z_keyexpr_t *rk) {
rk->_id = 0;
Expand Down
33 changes: 30 additions & 3 deletions src/session/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
z_result_t _z_liveliness_register_token(_z_session_t *zn, uint32_t id, const _z_keyexpr_t keyexpr) {
z_result_t ret = _Z_RES_OK;

_Z_DEBUG("Register liveliness token (%i:%.*s)", (int)id, (int)_z_string_len(&keyexpr._suffix),
_z_string_data(&keyexpr._suffix));

_zp_session_lock_mutex(zn);

const _z_keyexpr_t *pkeyexpr = _z_keyexpr_intmap_get(&zn->_local_tokens, id);
Expand All @@ -54,6 +57,8 @@ z_result_t _z_liveliness_register_token(_z_session_t *zn, uint32_t id, const _z_
void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id) {
_zp_session_lock_mutex(zn);

_Z_DEBUG("Unregister liveliness token (%i)", (int)id);

_z_keyexpr_intmap_remove(&zn->_local_tokens, id);

_zp_session_unlock_mutex(zn);
Expand All @@ -65,7 +70,7 @@ void _z_liveliness_unregister_token(_z_session_t *zn, uint32_t id) {
z_result_t _z_liveliness_subscription_declare(_z_session_t *zn, uint32_t id, const _z_keyexpr_t keyexpr,
const _z_timestamp_t *timestamp) {
z_result_t ret = _Z_RES_OK;
// TODO(sashacmc): What about history? Currently tokens decalred before subscription started, not processed

ret = _z_trigger_liveliness_subscriptions_declare(zn, keyexpr, timestamp);
if (ret == _Z_RES_OK) {
_zp_session_lock_mutex(zn);
Expand Down Expand Up @@ -107,6 +112,28 @@ z_result_t _z_liveliness_subscription_undeclare(_z_session_t *zn, uint32_t id, c
return ret;
}

z_result_t _z_liveliness_subscription_trigger_history(_z_session_t *zn, _z_keyexpr_t keyexpr) {
z_result_t ret = _Z_RES_OK;

_zp_session_lock_mutex(zn);
_z_keyexpr_intmap_t token_list = _z_keyexpr_intmap_clone(&zn->_remote_tokens);
_zp_session_unlock_mutex(zn);

_z_keyexpr_intmap_iterator_t iter = _z_keyexpr_intmap_iterator_make(&token_list);
_z_timestamp_t tm = _z_timestamp_null();
while (_z_keyexpr_intmap_iterator_next(&iter)) {
_z_keyexpr_t key = *_z_keyexpr_intmap_iterator_value(&iter);
if (_z_keyexpr_suffix_intersects(&key, &keyexpr)) {
ret = _z_trigger_liveliness_subscriptions_declare(zn, key, &tm);
if (ret != _Z_RES_OK) {
break;
}
}
}
_z_keyexpr_intmap_clear(&token_list);

return ret;
}
#endif // Z_FEATURE_SUBSCRIPTION == 1

/**************** Liveliness Query ****************/
Expand Down Expand Up @@ -140,7 +167,7 @@ uint32_t _z_liveliness_get_query_id(_z_session_t *zn) { return zn->_liveliness_q
z_result_t _z_liveliness_register_pending_query(_z_session_t *zn, uint32_t id, _z_liveliness_pending_query_t *pen_qry) {
z_result_t ret = _Z_RES_OK;

_Z_DEBUG(">>> Allocating liveliness query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id,
_Z_DEBUG("Register liveliness query for (%ju:%.*s)", (uintmax_t)pen_qry->_key._id,
(int)_z_string_len(&pen_qry->_key._suffix), _z_string_data(&pen_qry->_key._suffix));

_zp_session_lock_mutex(zn);
Expand Down Expand Up @@ -181,7 +208,7 @@ z_result_t _z_liveliness_pending_query_reply(_z_session_t *zn, uint32_t interest
_Z_DEBUG("Reply liveliness query for %d - %.*s", expanded_ke._id, (int)_z_string_len(&expanded_ke._suffix),
_z_string_data(&expanded_ke._suffix));

if (_z_keyexpr_suffix_intersects(&pq->_key, &expanded_ke) == false) {
if (!_z_keyexpr_suffix_intersects(&pq->_key, &expanded_ke)) {
ret = _Z_ERR_QUERY_NOT_MATCH;
}

Expand Down
5 changes: 5 additions & 0 deletions src/session/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ bool _z_resource_eq(const _z_resource_t *other, const _z_resource_t *this_) { re

void _z_resource_clear(_z_resource_t *res) { _z_keyexpr_clear(&res->_key); }

size_t _z_resource_size(_z_resource_t *p) {
_ZP_UNUSED(p);
return sizeof(_z_resource_t);
}

void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src) {
_z_keyexpr_copy(&dst->_key, &src->_key);
dst->_id = src->_id;
Expand Down
33 changes: 27 additions & 6 deletions tests/z_api_liveliness_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void on_receive(z_loaned_sample_t* s, void* context) {
}
}

void test_liveliness_sub(void) {
void test_liveliness_sub(bool multicast, bool history) {
const char* expr = "zenoh-pico/liveliness/test/*";

z_owned_session_t s1, s2;
Expand All @@ -82,6 +82,14 @@ void test_liveliness_sub(void) {
z_view_keyexpr_from_str(&k1, token1_expr);
z_view_keyexpr_from_str(&k2, token2_expr);

if (multicast) {
zp_config_insert(z_loan_mut(c1), Z_CONFIG_MODE_KEY, "client");
zp_config_insert(z_loan_mut(c1), Z_CONFIG_LISTEN_KEY, "udp/224.0.0.224:7447#iface=lo");

zp_config_insert(z_loan_mut(c2), Z_CONFIG_MODE_KEY, "peer");
zp_config_insert(z_loan_mut(c2), Z_CONFIG_LISTEN_KEY, "udp/224.0.0.224:7447#iface=lo");
}

assert_ok(z_open(&s1, z_config_move(&c1), NULL));
assert_ok(z_open(&s2, z_config_move(&c2), NULL));

Expand All @@ -90,18 +98,30 @@ void test_liveliness_sub(void) {
assert_ok(zp_start_lease_task(z_loan_mut(s1), NULL));
assert_ok(zp_start_lease_task(z_loan_mut(s2), NULL));

z_owned_liveliness_token_t t1, t2;
// In history mode we can declare token before subsribing
if (history) {
assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL));
assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL));
}

z_sleep_s(1);
z_owned_closure_sample_t closure;
context_t context = {false, false, false, false};
z_closure_sample(&closure, on_receive, NULL, (void*)(&context));

z_owned_subscriber_t sub;
z_liveliness_subscriber_options_t sub_opt;
z_liveliness_subscriber_options_default(&sub_opt);
sub_opt.history = history;
assert_ok(z_liveliness_declare_subscriber(z_session_loan(&s2), &sub, z_view_keyexpr_loan(&k),
z_closure_sample_move(&closure), NULL));
z_closure_sample_move(&closure), &sub_opt));

z_sleep_s(1);
z_owned_liveliness_token_t t1, t2;
assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL));
assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL));
if (!history) {
assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t1, z_view_keyexpr_loan(&k1), NULL));
assert_ok(z_liveliness_declare_token(z_session_loan(&s1), &t2, z_view_keyexpr_loan(&k2), NULL));
}

z_sleep_s(1);

Expand Down Expand Up @@ -191,7 +211,8 @@ void test_liveliness_get(void) {
int main(int argc, char** argv) {
(void)argc;
(void)argv;
test_liveliness_sub();
test_liveliness_sub(false, false);
test_liveliness_sub(false, true);
test_liveliness_get();
}

Expand Down
48 changes: 29 additions & 19 deletions tests/z_collections_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,25 +318,35 @@ void int_map_iterator_test(void) {
_z_str_intmap_insert(&map, 30, "C");
_z_str_intmap_insert(&map, 40, "D");

_z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map);

assert(_z_str_intmap_iterator_next(&iter));
assert(_z_str_intmap_iterator_key(&iter) == 20);
assert(strcmp(_z_str_intmap_iterator_value(&iter), "B") == 0);

assert(_z_str_intmap_iterator_next(&iter));
assert(_z_str_intmap_iterator_key(&iter) == 40);
assert(strcmp(_z_str_intmap_iterator_value(&iter), "D") == 0);

assert(_z_str_intmap_iterator_next(&iter));
assert(_z_str_intmap_iterator_key(&iter) == 10);
assert(strcmp(_z_str_intmap_iterator_value(&iter), "A") == 0);

assert(_z_str_intmap_iterator_next(&iter));
assert(_z_str_intmap_iterator_key(&iter) == 30);
assert(strcmp(_z_str_intmap_iterator_value(&iter), "C") == 0);

assert(!_z_str_intmap_iterator_next(&iter));
#define TEST_MAP(map) \
{ \
_z_str_intmap_iterator_t iter = _z_str_intmap_iterator_make(&map); \
assert(_z_str_intmap_iterator_next(&iter)); \
assert(_z_str_intmap_iterator_key(&iter) == 20); \
assert(strcmp(_z_str_intmap_iterator_value(&iter), "B") == 0); \
\
assert(_z_str_intmap_iterator_next(&iter)); \
assert(_z_str_intmap_iterator_key(&iter) == 40); \
assert(strcmp(_z_str_intmap_iterator_value(&iter), "D") == 0); \
\
assert(_z_str_intmap_iterator_next(&iter)); \
assert(_z_str_intmap_iterator_key(&iter) == 10); \
assert(strcmp(_z_str_intmap_iterator_value(&iter), "A") == 0); \
\
assert(_z_str_intmap_iterator_next(&iter)); \
assert(_z_str_intmap_iterator_key(&iter) == 30); \
assert(strcmp(_z_str_intmap_iterator_value(&iter), "C") == 0); \
\
assert(!_z_str_intmap_iterator_next(&iter)); \
}

TEST_MAP(map);

_z_str_intmap_t map2 = _z_str_intmap_clone(&map);

TEST_MAP(map2);

#undef TEST_MAP
}

int main(void) {
Expand Down

0 comments on commit 4580eb0

Please sign in to comment.