Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add weak references #510

Merged
merged 20 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ if(UNIX OR MSVC)
add_executable(z_api_bytes_test ${PROJECT_SOURCE_DIR}/tests/z_api_bytes_test.c)
add_executable(z_api_encoding_test ${PROJECT_SOURCE_DIR}/tests/z_api_encoding_test.c)
add_executable(z_api_config_test ${PROJECT_SOURCE_DIR}/tests/z_api_config_test.c)
add_executable(z_refcount_test ${PROJECT_SOURCE_DIR}/tests/z_refcount_test.c)

target_link_libraries(z_data_struct_test ${Libname})
target_link_libraries(z_channels_test ${Libname})
Expand All @@ -397,6 +398,7 @@ if(UNIX OR MSVC)
target_link_libraries(z_api_bytes_test ${Libname})
target_link_libraries(z_api_encoding_test ${Libname})
target_link_libraries(z_api_config_test ${Libname})
target_link_libraries(z_refcount_test ${Libname})

configure_file(${PROJECT_SOURCE_DIR}/tests/modularity.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/modularity.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/raweth.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/raweth.py COPYONLY)
Expand All @@ -418,6 +420,7 @@ if(UNIX OR MSVC)
add_test(z_api_bytes_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_bytes_test)
add_test(z_api_encoding_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_encoding_test)
add_test(z_api_config_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_api_config_test)
add_test(z_refcount_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_refcount_test)
endif()

if(BUILD_MULTICAST)
Expand Down
6 changes: 3 additions & 3 deletions examples/unix/c99/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/system/platform.h"

#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1
#if Z_FEATURE_SUBSCRIPTION == 1 && Z_FEATURE_PUBLICATION == 1 && Z_FEATURE_MULTI_THREAD == 1

#define DEFAULT_PKT_SIZE 8
#define DEFAULT_PING_NB 100
Expand Down Expand Up @@ -190,8 +190,8 @@ struct args_t parse_args(int argc, char** argv) {
#else
int main(void) {
printf(
"ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION or Z_FEATURE_PUBLICATION but this example "
"requires them.\n");
"ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION or Z_FEATURE_PUBLICATION or "
"Z_FEATURE_MULTI_THREAD but this example requires them.\n");
return -2;
}
#endif
8 changes: 4 additions & 4 deletions include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@
* Defines a generic function for cloning any of the ``z_owned_X_t`` types.
*
* Parameters:
* x: The instance to clone.
* x: The clone storage.
* y: The instance to clone.
*
* Returns:
* Returns the cloned instance of `x`.
*/
#define z_clone(x) _Generic((x), \
#define z_clone(x, y) _Generic((x), \
z_owned_keyexpr_t : z_keyexpr_clone, \
z_owned_config_t : z_config_clone, \
z_owned_session_t : z_session_clone, \
z_owned_subscriber_t : z_subscriber_clone, \
z_owned_publisher_t : z_publisher_clone, \
Expand All @@ -259,7 +259,7 @@
z_owned_hello_t : z_hello_clone, \
z_owned_string_t : z_string_clone, \
z_owned_string_array_t : z_string_array_clone \
)(&x)
)(&x, y)

/**
* Defines a generic function for making null object of any of the ``z_owned_X_t`` types.
Expand Down
330 changes: 249 additions & 81 deletions include/zenoh-pico/collections/refcount.h

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle);
* kind: The type of operation.
* attachment: An optional attachment to the reply.
*/
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t attachment);
int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, const _z_keyexpr_t keyexpr,
const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp,
const _z_bytes_t attachment);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ typedef struct _z_query_t {
_z_value_t _value;
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn; // FIXME: Potential UB source, Issue #476
_z_session_weak_t _zn; // Can't be an rc because of cross referencing
_z_bytes_t attachment;
char *_parameters;
_Bool _anyke;
Expand All @@ -50,7 +50,7 @@ typedef struct {
} _z_queryable_t;

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_rc_t *zn,
uint32_t request_id, const _z_bytes_t attachment);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ typedef struct _z_session_t {
#endif
} _z_session_t;

extern void _z_session_clear(_z_session_t *zn); // Forward type declaration to avoid cyclical include
extern void _z_session_clear(_z_session_t *zn); // Forward declaration to avoid cyclical include

_Z_REFCOUNT_DEFINE(_z_session, _z_session)

Expand All @@ -84,7 +84,7 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session)
* ``0`` in case of success, or a ``negative value`` in case of failure.
*
*/
int8_t _z_open(_z_session_t *zn, _z_config_t *config);
int8_t _z_open(_z_session_rc_t *zn, _z_config_t *config);

/**
* Close a zenoh-net session.
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ typedef _z_qos_t _z_n_qos_t;
static inline _z_qos_t _z_n_qos_create(_Bool express, z_congestion_control_t congestion_control,
z_priority_t priority) {
_z_n_qos_t ret;
_Bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1;
_Bool nodrop = (congestion_control != Z_CONGESTION_CONTROL_DROP);
ret._val = (uint8_t)((express << 4) | (nodrop << 3) | priority);
return ret;
}
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key);

_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
int8_t _z_trigger_queryables(_z_session_t *zn, _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
int8_t _z_trigger_queryables(_z_session_rc_t *zn, _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
const _z_bytes_t attachment);
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q);
void _z_flush_session_queryable(_z_session_t *zn);
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
_z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, const char *locator, const uint32_t timeout,
const _Bool exit_on_first);

int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid);
int8_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid);
void _z_session_clear(_z_session_t *zn);
int8_t _z_session_close(_z_session_t *zn, uint8_t reason);

int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
int8_t _z_handle_network_message(_z_session_rc_t *zsrc, _z_zenoh_message_t *z_msg, uint16_t local_peer_id);
int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);

Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ _z_transport_peer_entry_list_t *_z_transport_peer_entry_list_insert(_z_transport
_z_transport_peer_entry_t *entry);

// Forward type declaration to avoid cyclical include
typedef struct _z_session_t _z_session_t;
typedef struct _z_session_rc_t _z_session_rc_ref_t;

// Forward declaration to be used in _zp_f_send_tmsg*
typedef struct _z_transport_multicast_t _z_transport_multicast_t;
Expand All @@ -65,7 +65,7 @@ typedef int8_t (*_zp_f_send_tmsg)(_z_transport_multicast_t *self, const _z_trans

typedef struct {
// Session associated to the transport
_z_session_t *_session;
_z_session_rc_ref_t *_session;

#if Z_FEATURE_MULTI_THREAD == 1
// TX and RX mutexes
Expand Down Expand Up @@ -108,7 +108,7 @@ typedef struct {

typedef struct _z_transport_multicast_t {
// Session associated to the transport
_z_session_t *_session;
_z_session_rc_ref_t *_session;

#if Z_FEATURE_MULTI_THREAD == 1
// TX and RX mutexes
Expand Down
28 changes: 21 additions & 7 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -875,15 +875,16 @@ int8_t z_open(z_owned_session_t *zs, z_owned_config_t *config) {
z_config_drop(config);
return _Z_ERR_SYSTEM_OUT_OF_MEMORY;
}
zs->_rc = zsrc;
// Open session
int8_t ret = _z_open(&zsrc.in->val, &config->_val);
int8_t ret = _z_open(&zs->_rc, &config->_val);
if (ret != _Z_RES_OK) {
_z_session_rc_drop(&zsrc);
_z_session_rc_drop(&zs->_rc);
z_session_null(zs);
z_config_drop(config);
return ret;
}
// Store rc in session
zs->_rc = zsrc;
// Clean up
z_config_drop(config);
return _Z_RES_OK;
}
Expand Down Expand Up @@ -1297,6 +1298,12 @@ void z_query_reply_options_default(z_query_reply_options_t *options) {

int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr, z_owned_bytes_t *payload,
const z_query_reply_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn);
if (sess_rc.in == NULL) {
return _Z_ERR_CONNECTION_CLOSED;
}
// Set options
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
z_query_reply_options_t opts;
if (options == NULL) {
Expand All @@ -1309,12 +1316,13 @@ int8_t z_query_reply(const z_loaned_query_t *query, const z_loaned_keyexpr_t *ke
.encoding = _z_encoding_from_owned(opts.encoding)};

int8_t ret =
_z_send_reply(&query->in->val, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT, opts.congestion_control,
_z_send_reply(&query->in->val, &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_PUT, opts.congestion_control,
opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(opts.attachment));
if (payload != NULL) {
z_bytes_drop(payload);
}
// Clean-up
_z_session_rc_drop(&sess_rc);
z_encoding_drop(opts.encoding);
z_bytes_drop(opts.attachment);
return ret;
Expand All @@ -1330,6 +1338,11 @@ void z_query_reply_del_options_default(z_query_reply_del_options_t *options) {

int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t *keyexpr,
const z_query_reply_del_options_t *options) {
// Try upgrading session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&query->in->val._zn);
if (sess_rc.in == NULL) {
return _Z_ERR_CONNECTION_CLOSED;
}
_z_keyexpr_t keyexpr_aliased = _z_keyexpr_alias_from_user_defined(*keyexpr, true);
z_query_reply_del_options_t opts;
if (options == NULL) {
Expand All @@ -1341,9 +1354,10 @@ int8_t z_query_reply_del(const z_loaned_query_t *query, const z_loaned_keyexpr_t
_z_value_t value = {.payload = _z_bytes_null(), .encoding = _z_encoding_null()};

int8_t ret =
_z_send_reply(&query->in->val, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control,
_z_send_reply(&query->in->val, &sess_rc, keyexpr_aliased, value, Z_SAMPLE_KIND_DELETE, opts.congestion_control,
opts.priority, opts.is_express, opts.timestamp, _z_bytes_from_owned_bytes(opts.attachment));

// Clean-up
_z_session_rc_drop(&sess_rc);
z_bytes_drop(opts.attachment);
return ret;
}
Expand Down
15 changes: 8 additions & 7 deletions src/net/primitives.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,17 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle) {
return _Z_RES_OK;
}

int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl, z_priority_t priority,
_Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t att) {
int8_t _z_send_reply(const _z_query_t *query, const _z_session_rc_t *zsrc, _z_keyexpr_t keyexpr,
const _z_value_t payload, const z_sample_kind_t kind, const z_congestion_control_t cong_ctrl,
z_priority_t priority, _Bool is_express, const _z_timestamp_t *timestamp, const _z_bytes_t att) {
int8_t ret = _Z_RES_OK;
_z_session_t *zn = &zsrc->in->val;

_z_keyexpr_t q_ke;
_z_keyexpr_t r_ke;
if (query->_anyke == false) {
q_ke = _z_get_expanded_key_from_key(query->_zn, &query->_key);
r_ke = _z_get_expanded_key_from_key(query->_zn, &keyexpr);
q_ke = _z_get_expanded_key_from_key(zn, &query->_key);
r_ke = _z_get_expanded_key_from_key(zn, &keyexpr);
if (_z_keyexpr_intersects(q_ke._suffix, strlen(q_ke._suffix), r_ke._suffix, strlen(r_ke._suffix)) == false) {
ret = _Z_ERR_KEYEXPR_NOT_MATCH;
}
Expand All @@ -329,7 +330,7 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val

if (ret == _Z_RES_OK) {
// Build the reply context decorator. This is NOT the final reply.
_z_id_t zid = ((_z_session_t *)query->_zn)->_local_zid;
_z_id_t zid = zn->_local_zid;
_z_keyexpr_t ke = _z_keyexpr_alias(keyexpr);
_z_zenoh_message_t z_msg;
switch (kind) {
Expand Down Expand Up @@ -396,7 +397,7 @@ int8_t _z_send_reply(const _z_query_t *query, _z_keyexpr_t keyexpr, const _z_val
default:
return _Z_ERR_GENERIC;
}
if (_z_send_n_msg(query->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
ret = _Z_ERR_TRANSPORT_TX_FAILED;
}

Expand Down
22 changes: 14 additions & 8 deletions src/net/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,28 @@ _z_query_t _z_query_null(void) {
._parameters = NULL,
._request_id = 0,
._value = _z_value_null(),
._zn = NULL,
._zn = {.in = NULL},
};
}

void _z_query_clear(_z_query_t *q) {
// Send REPLY_FINAL message
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id);
if (_z_send_n_msg(q->_zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_Z_ERROR("Query send REPLY_FINAL transport failure !");
// Try to upgrade session weak to rc
_z_session_rc_t sess_rc = _z_session_weak_upgrade(&q->_zn);
if (sess_rc.in != NULL) {
// Send REPLY_FINAL message
_z_zenoh_message_t z_msg = _z_n_msg_make_response_final(q->_request_id);
if (_z_send_n_msg(&q->_zn.in->val, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) {
_Z_ERROR("Query send REPLY_FINAL transport failure !");
}
_z_msg_clear(&z_msg);
_z_session_rc_drop(&sess_rc);
}
// Clean up memory
_z_msg_clear(&z_msg);
z_free(q->_parameters);
_z_keyexpr_clear(&q->_key);
_z_value_clear(&q->_value);
_z_bytes_drop(&q->attachment);
_z_session_weak_drop(&q->_zn);
}

void _z_query_copy(_z_query_t *dst, const _z_query_t *src) {
Expand All @@ -60,11 +66,11 @@ void _z_query_free(_z_query_t **query) {
}

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_t *zn,
_z_query_t _z_query_create(_z_value_t *value, _z_keyexpr_t *key, const _z_slice_t *parameters, _z_session_rc_t *zsrc,
uint32_t request_id, const _z_bytes_t attachment) {
_z_query_t q = _z_query_null();
q._request_id = request_id;
q._zn = zn;
q._zn = _z_session_rc_clone_as_weak(zsrc);
q._parameters = (char *)z_malloc(parameters->len + 1);
memcpy(q._parameters, parameters->start, parameters->len);
q._parameters[parameters->len] = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/net/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/uuid.h"

int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) {
int8_t __z_open_inner(_z_session_rc_t *zn, char *locator, z_whatami_t mode) {
int8_t ret = _Z_RES_OK;

_z_id_t local_zid = _z_id_empty();
Expand All @@ -46,7 +46,7 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) {
local_zid = _z_id_empty();
return ret;
}
ret = _z_new_transport(&zn->_tp, &local_zid, locator, mode);
ret = _z_new_transport(&zn->in->val._tp, &local_zid, locator, mode);
if (ret != _Z_RES_OK) {
local_zid = _z_id_empty();
return ret;
Expand All @@ -55,7 +55,7 @@ int8_t __z_open_inner(_z_session_t *zn, char *locator, z_whatami_t mode) {
return ret;
}

int8_t _z_open(_z_session_t *zn, _z_config_t *config) {
int8_t _z_open(_z_session_rc_t *zn, _z_config_t *config) {
int8_t ret = _Z_RES_OK;

_z_id_t zid = _z_id_empty();
Expand Down
3 changes: 1 addition & 2 deletions src/protocol/codec/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
_Z_RETURN_IF_ERR(_z_wbuf_write_bytes(wbf, msg->_ext_responder._zid.id, 0, zidlen));
_Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_ext_responder._eid));
}

switch (msg->_tag) {
case _Z_RESPONSE_BODY_REPLY: {
_Z_RETURN_IF_ERR(_z_reply_encode(wbf, &msg->_body._reply));
Expand All @@ -302,9 +301,9 @@ int8_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) {
break;
}
}

return ret;
}

int8_t _z_response_decode_extension(_z_msg_ext_t *extension, void *ctx) {
int8_t ret = _Z_RES_OK;
_z_n_msg_response_t *msg = (_z_n_msg_response_t *)ctx;
Expand Down
Loading
Loading