Skip to content

Commit

Permalink
Implement connection restoring
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 20, 2024
1 parent e4012d6 commit f87bc4a
Show file tree
Hide file tree
Showing 24 changed files with 440 additions and 247 deletions.
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right);

void _z_locator_init(_z_locator_t *locator);
_z_string_t _z_locator_to_string(const _z_locator_t *loc);
z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s);
z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *s);

size_t _z_locator_size(_z_locator_t *lc);
void _z_locator_clear(_z_locator_t *lc);
Expand All @@ -72,7 +72,7 @@ typedef struct {
} _z_endpoint_t;

_z_string_t _z_endpoint_to_string(const _z_endpoint_t *e);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *s);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *s);
void _z_endpoint_clear(_z_endpoint_t *ep);
void _z_endpoint_free(_z_endpoint_t **ep);

Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ typedef struct _z_link_t {

void _z_link_clear(_z_link_t *zl);
void _z_link_free(_z_link_t **zl);
z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator);

z_result_t _z_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf);
size_t _z_link_recv_zbuf(const _z_link_t *zl, _z_zbuf_t *zbf, _z_slice_t *addr);
Expand Down
39 changes: 37 additions & 2 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/collections/list.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/session/liveliness.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/session.h"
Expand Down Expand Up @@ -55,6 +56,11 @@ typedef struct _z_session_t {
_z_resource_list_t *_local_resources;
_z_resource_list_t *_remote_resources;

// Information for session restoring
// Empty _config means session is not restorable
_z_config_t _config;
_z_network_message_list_t *_decalaration_cache;

// Session subscriptions
#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_list_t *_subscriptions;
Expand Down Expand Up @@ -99,14 +105,43 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session)
* Open a zenoh-net session
*
* Parameters:
* zn: A pointer of A :c:type:`_z_session_rc_t` used as a return value.
* config: A set of properties. The caller keeps its ownership.
* zn: A pointer of A :c:type:`_z_session_t` used as a return value.
* zid: A pointer to Zenoh ID.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid);

/**
* Reopen a disconnected zenoh-net session
*
* Parameters:
* zn: Existing zenoh-net session.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_reopen(_z_session_rc_t *zn);

/**
* Store declaration network message to cache for resend it after session restore
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with declaration
*/
void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Remove corresponding declaration from the cache
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with undeclaration
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config);
void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Close a zenoh-net session.
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg);
inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); }
_Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move)
_Z_SVEC_DEFINE(_z_network_message, _z_network_message_t)
_Z_LIST_DEFINE(_z_network_message, _z_network_message_t)

void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping);
_z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid,
Expand All @@ -308,6 +309,7 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool ha
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);
z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src);
_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src);

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern "C" {
_z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, _z_string_t *locator, const uint32_t timeout,
const bool exit_on_first);

z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid);
z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid);
void _z_session_clear(_z_session_t *zn);
z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);

Expand Down
30 changes: 30 additions & 0 deletions include/zenoh-pico/transport/common/transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_COMMON_TRANSPORT_H
#define ZENOH_PICO_COMMON_TRANSPORT_H

#include "zenoh-pico/transport/transport.h"

#ifdef __cplusplus
extern "C" {
#endif

void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks);

#ifdef __cplusplus
}
#endif

#endif /* ZENOH_PICO_COMMON_TRANSPORT_H*/
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ enum _z_peer_op_e {
_Z_PEER_OP_LISTEN = 1,
};

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op);
z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode,
int peer_op);
void _z_free_transport(_z_transport_t **zt);

#ifdef __cplusplus
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa
const _z_id_t *local_zid);
z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only);
z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason);
void _z_multicast_transport_clear(_z_transport_t *zt);
void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks);

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ typedef struct {
uint8_t _seq_num_res;
} _z_transport_multicast_establish_param_t;

_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt);
z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason);
void _z_transport_clear(_z_transport_t *zt);
void _z_transport_free(_z_transport_t **zt);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
const _z_id_t *local_zid, int peer_op);
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks);

#ifdef __cplusplus
}
Expand Down
62 changes: 50 additions & 12 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "zenoh-pico/transport/common/tx.h"
#include "zenoh-pico/transport/multicast.h"
#include "zenoh-pico/transport/unicast.h"
#include "zenoh-pico/utils/config.h"
#include "zenoh-pico/utils/endianness.h"
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/pointers.h"
Expand Down Expand Up @@ -624,34 +625,71 @@ z_result_t z_scout(z_moved_config_t *config, z_moved_closure_hello_t *callback,

void z_open_options_default(z_open_options_t *options) { options->__dummy = 0; }

z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) {
_ZP_UNUSED(options);
static _z_id_t _z_session_get_zid(const _z_config_t *config) {
_z_id_t zid = _z_id_empty();
char *opt_as_str = _z_config_get(config, Z_CONFIG_SESSION_ZID_KEY);
if (opt_as_str != NULL) {
_z_uuid_to_bytes(zid.id, opt_as_str);
} else {
_z_session_generate_zid(&zid, Z_ZID_LENGTH);
}
return zid;
}

static z_result_t _z_session_rc_init(z_owned_session_t *zs, _z_id_t *zid) {
z_internal_session_null(zs);
_z_session_t *s = z_malloc(sizeof(_z_session_t));
if (s == NULL) {
z_config_drop(config);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
memset(s, 0, sizeof(_z_session_t));
// Create rc

z_result_t ret = _z_session_init(s, zid);
if (ret != _Z_RES_OK) {
z_free(s);
return ret;
}

_z_session_rc_t zsrc = _z_session_rc_new(s);
if (zsrc._cnt == NULL) {
_z_session_clear(s);
z_free(s);
z_config_drop(config);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
zs->_rc = zsrc;
// Open session
z_result_t ret = _z_open(&zs->_rc, &config->_this._val);

return _Z_RES_OK;
}

z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) {
_ZP_UNUSED(options);

_z_config_t *cfg = &config->_this._val;
if (config == NULL) {
_Z_ERROR("A valid config is missing.");
return _Z_ERR_GENERIC;
}

_z_id_t zid = _z_session_get_zid(cfg);

z_result_t ret = _z_session_rc_init(zs, &zid);
if (ret != _Z_RES_OK) {
_z_session_rc_decr(&zs->_rc);
z_internal_session_null(zs);
z_config_drop(config);
z_free(s);
return ret;
}

ret = _z_open(&zs->_rc, cfg, &zid);
if (ret != _Z_RES_OK) {
z_session_drop(z_session_move(zs));
z_config_drop(config);
return ret;
}
// Clean up
z_config_drop(config);
if (/* session is restorable*/ true) {
_Z_OWNED_RC_IN_VAL(zs)->_config = config->_this._val;
z_internal_config_null(&config->_this);
} else {
z_config_drop(config);
}
return _Z_RES_OK;
}

Expand Down
12 changes: 6 additions & 6 deletions src/link/endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right) {
return res;
}

static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_string_t *str) {
static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, const _z_string_t *str) {
*protocol = _z_string_null();

const char *p_start = _z_string_data(str);
Expand All @@ -97,7 +97,7 @@ static z_result_t _z_locator_protocol_from_string(_z_string_t *protocol, _z_stri
return _z_string_copy_substring(protocol, str, 0, p_len);
}

static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string_t *str) {
static z_result_t _z_locator_address_from_string(_z_string_t *address, const _z_string_t *str) {
*address = _z_string_null();

// Find protocol separator
Expand Down Expand Up @@ -130,7 +130,7 @@ static z_result_t _z_locator_address_from_string(_z_string_t *address, _z_string
return _z_string_copy_substring(address, str, start_offset, addr_len);
}

z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, _z_string_t *str) {
z_result_t _z_locator_metadata_from_string(_z_str_intmap_t *strint, const _z_string_t *str) {
*strint = _z_str_intmap_make();

// Find metadata separator
Expand Down Expand Up @@ -169,7 +169,7 @@ void _z_locator_metadata_onto_str(char *dst, size_t dst_len, const _z_str_intmap
_z_str_intmap_onto_str(dst, dst_len, s, 0, NULL);
}

z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *str) {
z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *str) {
if (str == NULL || !_z_string_check(str)) {
return _Z_ERR_CONFIG_LOCATOR_INVALID;
}
Expand Down Expand Up @@ -284,7 +284,7 @@ void _z_endpoint_free(_z_endpoint_t **ep) {
}
}

z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, _z_string_t *str, _z_string_t *proto) {
z_result_t _z_endpoint_config_from_string(_z_str_intmap_t *strint, const _z_string_t *str, _z_string_t *proto) {
char *p_start = (char *)memchr(_z_string_data(str), ENDPOINT_CONFIG_SEPARATOR, _z_string_len(str));
if (p_start != NULL) {
p_start = _z_ptr_char_offset(p_start, 1);
Expand Down Expand Up @@ -411,7 +411,7 @@ char *_z_endpoint_config_to_string(const _z_str_intmap_t *s, const _z_string_t *
return NULL;
}

z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *str) {
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *str) {
_z_endpoint_init(ep);
_Z_CLEAN_RETURN_IF_ERR(_z_locator_from_string(&ep->_locator, str), _z_endpoint_clear(ep));
_Z_CLEAN_RETURN_IF_ERR(_z_endpoint_config_from_string(&ep->_config, str, &ep->_locator._protocol),
Expand Down
4 changes: 2 additions & 2 deletions src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "zenoh-pico/link/manager.h"
#include "zenoh-pico/utils/logging.h"

z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) {
z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator) {
z_result_t ret = _Z_RES_OK;

_z_endpoint_t ep;
Expand Down Expand Up @@ -71,7 +71,7 @@ z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) {
return ret;
}

z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator) {
z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator) {
z_result_t ret = _Z_RES_OK;

_z_endpoint_t ep;
Expand Down
Loading

0 comments on commit f87bc4a

Please sign in to comment.