Skip to content

Commit

Permalink
Liveliness subscriber implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Oct 25, 2024
1 parent 8c1106c commit 18b3893
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 67 deletions.
139 changes: 139 additions & 0 deletions examples/unix/c11/z_sub_liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#include <ctype.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_LIVELINESS == 1

static int msg_nb = 0;

void data_handler(z_loaned_sample_t *sample, void *ctx) {
(void)(ctx);
z_view_string_t key_string;
z_keyexpr_as_view_string(z_sample_keyexpr(sample), &key_string);
switch (z_sample_kind(sample)) {
case Z_SAMPLE_KIND_PUT:
printf(">> [LivelinessSubscriber] New alive token ('%.*s')\n", (int)z_string_len(z_loan(key_string)),
z_string_data(z_loan(key_string)));
break;
case Z_SAMPLE_KIND_DELETE:
printf(">> [LivelinessSubscriber] Dropped token ('%.*s')\n", (int)z_string_len(z_loan(key_string)),
z_string_data(z_loan(key_string)));
break;
}
}

int main(int argc, char **argv) {
const char *keyexpr = "group1/**";
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
clocator = optarg;
break;
case 'm':
mode = optarg;
break;
case 'l':
llocator = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_loan_mut(config), Z_CONFIG_MODE_KEY, mode);
if (clocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_CONNECT_KEY, clocator);
}
if (llocator != NULL) {
zp_config_insert(z_loan_mut(config), Z_CONFIG_LISTEN_KEY, llocator);
}

printf("Opening session...\n");
z_owned_session_t s;
if (z_open(&s, z_move(config), NULL) < 0) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan_mut(s), NULL) < 0 || zp_start_lease_task(z_loan_mut(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_session_drop(z_session_move(&s));
return -1;
}

printf("Declaring liveliness subscriber on '%s'...\n", keyexpr);
z_owned_closure_sample_t callback;
z_closure(&callback, data_handler, NULL, NULL);
z_owned_subscriber_t sub;

z_view_keyexpr_t ke;
z_view_keyexpr_from_str(&ke, keyexpr);
if (z_liveliness_declare_subscriber(z_loan(s), &sub, z_loan(ke), z_move(callback), NULL) < 0) {
printf("Unable to declare liveliness subscriber.\n");
exit(-1);
}

printf("Press CTRL-C to quit...\n");
while (1) {
z_sleep_s(1);
}
printf("Press CTRL-C to quit...\n");
while (1) {
if ((n != 0) && (msg_nb >= n)) {
break;
}
sleep(1);
}
// Clean up
z_drop(z_move(sub));
z_drop(z_move(s));
return 0;
}
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION and Z_FEATURE_LIVELINESS but this example "
"requires it.\n");
return -2;
}
#endif
28 changes: 14 additions & 14 deletions include/zenoh-pico/api/liveliness.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ typedef struct z_liveliness_get_options_t {
z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_options_t *options);

/**
* Declares a subscriber on liveliness tokens that intersect `key_expr`.
* Declares a subscriber on liveliness tokens that intersect `keyexpr`.
*
* @param token: An uninitialized memory location where subscriber will be constructed.
* @param session: The Zenoh session.
* @param key_expr: The key expression to subscribe to.
* @param zs: The Zenoh session.
* @param keyexpr: The key expression to subscribe to.
* @param callback: The callback function that will be called each time a liveliness token status is changed.
* @param _options: The options to be passed to the liveliness subscriber declaration.
*
* @return 0 in case of success, negative error values otherwise.
*/
z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *session, z_owned_subscriber_t *token,
const z_loaned_keyexpr_t *key_expr, z_moved_closure_sample_t *callback,
z_liveliness_subscriber_options_t *_options);
z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub,
const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback,
z_liveliness_subscriber_options_t *options);
/**
* Constructs default value for `z_liveliness_declaration_options_t`.
*/
Expand All @@ -80,12 +80,12 @@ z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_opt
* is achieved, and a DELETE sample if it's lost.
*
* @param token: An uninitialized memory location where liveliness token will be constructed.
* @param session: A Zenos session to declare the liveliness token.
* @param key_expr: A keyexpr to declare a liveliess token for.
* @param zs: A Zenos session to declare the liveliness token.
* @param keyexpr: A keyexpr to declare a liveliess token for.
* @param _options: Liveliness token declaration properties.
*/
z_result_t z_liveliness_declare_token(const z_loaned_session_t *session, z_owned_liveliness_token_t *token,
const z_loaned_keyexpr_t *key_expr,
z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_liveliness_token_t *token,
const z_loaned_keyexpr_t *keyexpr,
const z_liveliness_declaration_options_t *_options);

/**
Expand All @@ -94,14 +94,14 @@ z_result_t z_liveliness_declare_token(const z_loaned_session_t *session, z_owned
z_result_t z_liveliness_get_options_default(z_liveliness_get_options_t *options);

/**
* Queries liveliness tokens currently on the network with a key expression intersecting with `key_expr`.
* Queries liveliness tokens currently on the network with a key expression intersecting with `keyexpr`.
*
* @param session: The Zenoh session.
* @param key_expr: The key expression to query liveliness tokens for.
* @param zs: The Zenoh session.
* @param keyexpr: The key expression to query liveliness tokens for.
* @param callback: The callback function that will be called for each received reply.
* @param options: Additional options for the liveliness get operation.
*/
z_result_t z_liveliness_get(const z_loaned_session_t *session, const z_loaned_keyexpr_t *key_expr,
z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
z_moved_closure_reply_t *callback, z_liveliness_get_options_t *options);

/**
Expand Down
31 changes: 31 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,37 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete
bool is_express);
#endif

#if Z_FEATURE_LIVELINESS == 1
#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Declare a :c:type:`_z_subscriber_t` for the given liveliness key.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value.
* callback: The callback function that will be called each time a matching liveliness token changed.
* arg: A pointer that will be passed to the **callback** on each call.
*
* Returns:
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
*/
_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper,
void *arg);

/**
* Undeclare a liveliness :c:type:`_z_subscriber_t`.
*
* Parameters:
* sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the
* subscriber upon successful return.
* Returns:
* 0 if success, or a negative value identifying the error.
*/
z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub);
#endif // Z_FEATURE_SUBSCRIPTION == 1
#endif // Z_FEATURE_LIVELINESS == 1

#if Z_FEATURE_INTEREST == 1
uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
Expand Down
23 changes: 17 additions & 6 deletions include/zenoh-pico/session/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,30 @@ extern "C" {
#endif

/*------------------ Subscription ------------------*/
void _z_trigger_local_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_n_qos_t qos, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment, z_reliability_t reliability);
z_result_t _z_trigger_subscriptions_put(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_timestamp_t *timestamp, const _z_n_qos_t qos,
const _z_bytes_t attachment, z_reliability_t reliability);

z_result_t _z_trigger_subscriptions_del(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability);

z_result_t _z_trigger_liveliness_subscriptions_declare(_z_session_t *zn, const _z_keyexpr_t keyexpr,
const _z_timestamp_t *timestamp);

z_result_t _z_trigger_liveliness_subscriptions_undeclare(_z_session_t *zn, const _z_keyexpr_t keyexpr,
const _z_timestamp_t *timestamp);

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_t *_z_get_subscription_by_id(_z_session_t *zn, _z_subscriber_kind_t kind, const _z_zint_t id);
_z_subscription_rc_list_t *_z_get_subscriptions_by_key(_z_session_t *zn, _z_subscriber_kind_t kind,
const _z_keyexpr_t *keyexpr);

_z_subscription_rc_t *_z_register_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_t *sub);
z_result_t _z_trigger_subscriptions(_z_session_t *zn, const _z_keyexpr_t keyexpr, const _z_bytes_t payload,
_z_encoding_t *encoding, const _z_zint_t kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment, z_reliability_t reliability);
z_result_t _z_trigger_subscriptions_impl(_z_session_t *zn, _z_subscriber_kind_t subscriber_kind,
const _z_keyexpr_t keyexpr, const _z_bytes_t payload, _z_encoding_t *encoding,
const _z_zint_t sample_kind, const _z_timestamp_t *timestamp,
const _z_n_qos_t qos, const _z_bytes_t attachment,
z_reliability_t reliability);
void _z_unregister_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_rc_t *sub);
void _z_flush_subscriptions(_z_session_t *zn);
#endif
Expand Down
16 changes: 8 additions & 8 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -828,12 +828,12 @@ z_result_t z_put(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr
opt.priority, opt.is_express, opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this),
reliability);

// Trigger local subscriptions
_z_trigger_local_subscriptions(
// Trigger subscriptions
_z_trigger_subscriptions_put(
_Z_RC_IN_VAL(zs), keyexpr_aliased, _z_bytes_from_owned_bytes(&payload->_this),
opt.encoding == NULL ? NULL : &opt.encoding->_this._val,
opt.encoding == NULL ? NULL : &opt.encoding->_this._val, opt.timestamp,
_z_n_qos_make(opt.is_express, opt.congestion_control == Z_CONGESTION_CONTROL_BLOCK, opt.priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
_z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
// Clean-up
z_encoding_drop(opt.encoding);
z_bytes_drop(opt.attachment);
Expand Down Expand Up @@ -961,11 +961,11 @@ z_result_t z_publisher_put(const z_loaned_publisher_t *pub, z_moved_bytes_t *pay
Z_SAMPLE_KIND_PUT, pub->_congestion_control, pub->_priority, pub->_is_express, opt.timestamp,
_z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(
_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding,
// Trigger subscriptions
_z_trigger_subscriptions_put(
_Z_RC_IN_VAL(&sess_rc), pub_keyexpr, _z_bytes_from_owned_bytes(&payload->_this), &encoding, opt.timestamp,
_z_n_qos_make(pub->_is_express, pub->_congestion_control == Z_CONGESTION_CONTROL_BLOCK, pub->_priority),
opt.timestamp, _z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);
_z_bytes_from_owned_bytes(&opt.attachment->_this), reliability);

_z_session_rc_drop(&sess_rc);
} else {
Expand Down
41 changes: 27 additions & 14 deletions src/api/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "zenoh-pico/api/liveliness.h"

#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/net/primitives.h"
#include "zenoh-pico/utils/result.h"

_Bool _z_liveliness_token_check(const _z_liveliness_token_t *token) {
Expand Down Expand Up @@ -54,29 +56,40 @@ z_result_t z_liveliness_subscriber_options_default(z_liveliness_subscriber_optio
return _Z_RES_OK;
}

z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *session, z_owned_subscriber_t *token,
const z_loaned_keyexpr_t *key_expr, z_moved_closure_sample_t *callback,
z_liveliness_subscriber_options_t *_options) {
// TODO(sashacmc): Implement
(void)token;
(void)session;
(void)key_expr;
(void)callback;
(void)_options;
return _Z_RES_OK;
z_result_t z_liveliness_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub,
const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback,
z_liveliness_subscriber_options_t *options) {
_ZP_UNUSED(options);
void *ctx = callback->_this._val.context;
callback->_this._val.context = NULL;

_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
_z_keyexpr_t key = _z_keyexpr_alias(keyexpr_aliased);

_z_subscriber_t int_sub =
_z_declare_liveliness_subscriber(zs, key, callback->_this._val.call, callback->_this._val.drop, ctx);

z_internal_closure_sample_null(&callback->_this);
sub->_val = int_sub;

if (!_z_subscriber_check(&sub->_val)) {
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
} else {
return _Z_RES_OK;
}
}

z_result_t z_liveliness_declaration_options_default(z_liveliness_declaration_options_t *options) {
options->__dummy = 0;
return _Z_RES_OK;
}

z_result_t z_liveliness_declare_token(const z_loaned_session_t *session, z_owned_liveliness_token_t *token,
z_result_t z_liveliness_declare_token(const z_loaned_session_t *zs, z_owned_liveliness_token_t *token,
const z_loaned_keyexpr_t *key_expr,
const z_liveliness_declaration_options_t *_options) {
// TODO(sashacmc): Implement
(void)token;
(void)session;
(void)zs;
(void)key_expr;
(void)_options;
return _Z_RES_OK;
Expand All @@ -87,10 +100,10 @@ z_result_t z_liveliness_get_options_default(z_liveliness_get_options_t *options)
return _Z_RES_OK;
}

z_result_t z_liveliness_get(const z_loaned_session_t *session, const z_loaned_keyexpr_t *key_expr,
z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *key_expr,
z_moved_closure_reply_t *callback, z_liveliness_get_options_t *options) {
// TODO(sashacmc): Implement
(void)session;
(void)zs;
(void)key_expr;
(void)callback;
(void)options;
Expand Down
Loading

0 comments on commit 18b3893

Please sign in to comment.