Skip to content

Commit

Permalink
Code reorganization
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 5, 2024
1 parent 3a60111 commit df2d1a0
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 191 deletions.
8 changes: 8 additions & 0 deletions include/zenoh-pico/api/liveliness.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/protocol/core.h"

#ifdef __cplusplus
extern "C" {
#endif

typedef struct {
uint32_t _id;
_z_keyexpr_t _key;
Expand Down Expand Up @@ -143,4 +147,8 @@ z_result_t z_liveliness_get(const z_loaned_session_t *zs, const z_loaned_keyexpr

#endif // Z_FEATURE_LIVELINESS == 1

#ifdef __cplusplus
}
#endif

#endif // INCLUDE_ZENOH_PICO_API_LIVELINESS_H
83 changes: 83 additions & 0 deletions include/zenoh-pico/net/liveliness.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#ifndef INCLUDE_ZENOH_PICO_NET_LIVELINESS_H
#define INCLUDE_ZENOH_PICO_NET_LIVELINESS_H

#include "zenoh-pico/api/liveliness.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/net/subscribe.h"
#include "zenoh-pico/protocol/core.h"

#ifdef __cplusplus
extern "C" {
#endif

#if Z_FEATURE_LIVELINESS == 1

z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token,
_z_keyexpr_t keyexpr);
z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token);

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Declare a :c:type:`_z_subscriber_t` for the given liveliness key.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value.
* callback: The callback function that will be called each time a matching liveliness token changed.
* arg: A pointer that will be passed to the **callback** on each call.
*
* Returns:
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
*/
_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper,
void *arg);

/**
* Undeclare a liveliness :c:type:`_z_subscriber_t`.
*
* Parameters:
* sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the
* subscriber upon successful return.
* Returns:
* 0 if success, or a negative value identifying the error.
*/
z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub);
#endif // Z_FEATURE_SUBSCRIPTION == 1

#if Z_FEATURE_QUERY == 1
/**
* Query liveliness token state.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to liveliness token.
* callback: The callback function that will be called on reception of replies for this query.
* dropper: The callback function that will be called on upon completion of the callback.
* arg: A pointer that will be passed to the **callback** on each call.
* timeout_ms: The timeout value of this query.
*/
z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback,
_z_drop_handler_t dropper, void *arg, uint64_t timeout_ms);
#endif // Z_FEATURE_QUERY == 1

#endif // Z_FEATURE_LIVELINESS == 1

#ifdef __cplusplus
}
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_LIVELINESS_H */
55 changes: 0 additions & 55 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
#include <stdint.h>

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/api/liveliness.h"
#include "zenoh-pico/collections/string.h"
#include "zenoh-pico/net/encoding.h"
#include "zenoh-pico/net/publish.h"
#include "zenoh-pico/net/query.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/net/subscribe.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/utils/config.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -256,59 +254,6 @@ z_result_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *paramete
bool is_express);
#endif

#if Z_FEATURE_LIVELINESS == 1

#if Z_FEATURE_SUBSCRIPTION == 1
/**
* Declare a :c:type:`_z_subscriber_t` for the given liveliness key.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to subscribe. The callee gets the ownership of any allocated value.
* callback: The callback function that will be called each time a matching liveliness token changed.
* arg: A pointer that will be passed to the **callback** on each call.
*
* Returns:
* The created :c:type:`_z_subscriber_t` (in null state if the declaration failed).
*/
_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper,
void *arg);

/**
* Undeclare a liveliness :c:type:`_z_subscriber_t`.
*
* Parameters:
* sub: The :c:type:`_z_subscriber_t` to undeclare. The callee releases the
* subscriber upon successful return.
* Returns:
* 0 if success, or a negative value identifying the error.
*/
z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub);
#endif // Z_FEATURE_SUBSCRIPTION == 1

#if Z_FEATURE_QUERY == 1
/**
* Query liveliness token state.
*
* Parameters:
* zn: The zenoh-net session. The caller keeps its ownership.
* keyexpr: The resource key to liveliness token.
* callback: The callback function that will be called on reception of replies for this query.
* dropper: The callback function that will be called on upon completion of the callback.
* arg: A pointer that will be passed to the **callback** on each call.
* timeout_ms: The timeout value of this query.
*/
z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback,
_z_drop_handler_t dropper, void *arg, uint64_t timeout_ms);
#endif // Z_FEATURE_QUERY == 1

z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token,
_z_keyexpr_t keyexpr);
z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token);

#endif // Z_FEATURE_LIVELINESS == 1

#if Z_FEATURE_INTEREST == 1
uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
Expand Down
8 changes: 8 additions & 0 deletions include/zenoh-pico/session/liveliness.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "zenoh-pico/session/session.h"

#ifdef __cplusplus
extern "C" {
#endif

#if Z_FEATURE_LIVELINESS == 1
typedef struct _z_session_t _z_session_t;

Expand Down Expand Up @@ -59,4 +63,8 @@ void _z_liveliness_init(_z_session_t *zn);
void _z_liveliness_clear(_z_session_t *zn);
#endif

#ifdef __cplusplus
}
#endif

#endif /* ZENOH_PICO_SESSION_LIVELINESS_H */
1 change: 1 addition & 0 deletions src/api/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "zenoh-pico/api/liveliness.h"

#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/net/liveliness.h"
#include "zenoh-pico/net/primitives.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"
Expand Down
164 changes: 164 additions & 0 deletions src/net/liveliness.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include "zenoh-pico/api/liveliness.h"

#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/protocol/keyexpr.h"
#include "zenoh-pico/session/resource.h"
#include "zenoh-pico/session/session.h"
#include "zenoh-pico/session/subscription.h"
#include "zenoh-pico/session/utils.h"
#include "zenoh-pico/utils/result.h"

#if Z_FEATURE_LIVELINESS == 1

/**************** Liveliness Token ****************/

z_result_t _z_declare_liveliness_token(const _z_session_rc_t *zn, _z_liveliness_token_t *ret_token,
_z_keyexpr_t keyexpr) {
z_result_t ret;

uint32_t id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
_z_keyexpr_t key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr);

_z_declaration_t declaration = _z_make_decl_token(&key, id);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
ret = _z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
_z_n_msg_clear(&n_msg);

_z_liveliness_register_token(_Z_RC_IN_VAL(zn), id, key);

ret_token->_id = id;
_z_keyexpr_move(&ret_token->_key, &key);
ret_token->_zn = _z_session_rc_clone_as_weak(zn);
return ret;
}

z_result_t _z_undeclare_liveliness_token(_z_liveliness_token_t *token) {
if (token == NULL || _Z_RC_IS_NULL(&token->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}

z_result_t ret;

_z_liveliness_unregister_token(_Z_RC_IN_VAL(&token->_zn), token->_id);

_z_declaration_t declaration = _z_make_undecl_token(token->_id, &token->_key);
_z_network_message_t n_msg = _z_n_msg_make_declare(declaration, false, 0);
ret = _z_send_n_msg(_Z_RC_IN_VAL(&token->_zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK);
_z_n_msg_clear(&n_msg);

return ret;
}

/**************** Liveliness Subscriber ****************/

#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscriber_t _z_declare_liveliness_subscriber(const _z_session_rc_t *zn, _z_keyexpr_t keyexpr,
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper,
void *arg) {
_z_subscription_t s;
s._id = _z_get_entity_id(_Z_RC_IN_VAL(zn));
s._key_id = keyexpr._id;
s._key = _z_get_expanded_key_from_key(_Z_RC_IN_VAL(zn), &keyexpr);
s._callback = callback;
s._dropper = dropper;
s._arg = arg;

_z_subscriber_t ret = _z_subscriber_null();
// Register subscription, stored at session-level, do not drop it by the end of this function.
_z_subscription_rc_t *sp_s =
_z_register_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, &s);
if (sp_s == NULL) {
_z_subscriber_clear(&ret);
return ret;
}
// Build the declare message to send on the wire
_z_interest_t interest =
_z_make_interest(&keyexpr, s._id,
_Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS | _Z_INTEREST_FLAG_RESTRICTED |
_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE);

_z_network_message_t n_msg = _z_n_msg_make_interest(interest);
if (_z_send_n_msg(_Z_RC_IN_VAL(zn), &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_unregister_subscription(_Z_RC_IN_VAL(zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sp_s);
_z_subscriber_clear(&ret);
return ret;
}
_z_n_msg_clear(&n_msg);

ret._entity_id = s._id;
ret._zn = _z_session_rc_clone_as_weak(zn);
return ret;
}

z_result_t _z_undeclare_liveliness_subscriber(_z_subscriber_t *sub) {
if (sub == NULL || _Z_RC_IS_NULL(&sub->_zn)) {
return _Z_ERR_ENTITY_UNKNOWN;
}

_z_subscription_rc_t *s =
_z_get_subscription_by_id(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, sub->_entity_id);
if (s == NULL) {
return _Z_ERR_ENTITY_UNKNOWN;
}

_z_unregister_subscription(_Z_RC_IN_VAL(&sub->_zn), _Z_SUBSCRIBER_KIND_LIVELINESS_SUBSCRIBER, s);
return _Z_RES_OK;
}
#endif // Z_FEATURE_SUBSCRIPTION == 1

/**************** Liveliness Query ****************/

#if Z_FEATURE_QUERY == 1
z_result_t _z_liveliness_query(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_closure_reply_callback_t callback,
_z_drop_handler_t dropper, void *arg, uint64_t timeout_ms) {
z_result_t ret = _Z_RES_OK;

// Create the pending liveliness query object
_z_liveliness_pending_query_t *pq =
(_z_liveliness_pending_query_t *)z_malloc(sizeof(_z_liveliness_pending_query_t));
if (pq != NULL) {
uint32_t id = _z_liveliness_get_query_id(zn);
pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr);
pq->_callback = callback;
pq->_dropper = dropper;
pq->_arg = arg;

ret = _z_liveliness_register_pending_query(zn, id, pq);
if (ret == _Z_RES_OK) {
_ZP_UNUSED(timeout_ms); // Current interest in pico don't support timeout

_z_interest_t interest = _z_make_interest(&keyexpr, id,
_Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_TOKENS |
_Z_INTEREST_FLAG_RESTRICTED | _Z_INTEREST_FLAG_CURRENT);

_z_network_message_t n_msg = _z_n_msg_make_interest(interest);
if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_z_liveliness_unregister_pending_query(zn, id);
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}

_z_n_msg_clear(&n_msg);

} else {
_z_liveliness_pending_query_clear(pq);
}
}

return ret;
}
#endif // Z_FEATURE_QUERY == 1

#endif // Z_FEATURE_LIVELINESS == 1
Loading

0 comments on commit df2d1a0

Please sign in to comment.