From ebdb01e4bc099ee566e5d0ca9cc1ef251f61adce Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Tue, 5 Mar 2024 17:45:25 +0100 Subject: [PATCH] Add unicast interest feature (#349) * refactor: rename make final interest function * feat: add Interest config token * fix: PA copy paste error * feat: add session interest type * feat: add intrest type * feat: add session interest module * feat: add interest primitives * feat: add session mutex function * feat: add interest message processing * fix: modularize process declare interest function * feat: implement trigger_interest function * fix: clone entitiy lists before sending declarations * fix: remove unused function * fix: rename file * fix: set interest_msg fields as public * feat: process undeclares as well * fix: remove unecessary enum value * fix: remove dropper function * feat: add write side filtering entity * feat: implement filtering * fix: remove interest by default (for now) * fix: rework publisher put actions order * fix: add missing const * fix: add missing dummy functions * fix: exclude dummy functions from config token * fix: remove unused defines * fix: give keyexpr ownership to declare * feat: add refcount copy function * fix: duplicate key for declare * feat: add resource copy function * fix: use keyexpr alias instead of duplicate * fix: flip early return condition * feat: send final interest * refactor: rename ambiguous functions * fix: fuse identical functions * fix: trigger local subs after push message * fix: add missing switch default label * build: add Z_FEATURE_INTEREST to build system * fix: remove unused function * refactor: regroup static functions * fix: add flag filtering on interest look-up --- CMakeLists.txt | 3 + GNUmakefile | 3 +- include/zenoh-pico/collections/refcount.h | 4 + include/zenoh-pico/config.h | 7 + include/zenoh-pico/net/filtering.h | 48 +++ include/zenoh-pico/net/primitives.h | 6 + include/zenoh-pico/net/publish.h | 6 +- include/zenoh-pico/net/session.h | 5 + .../protocol/definitions/declarations.h | 4 +- include/zenoh-pico/session/interest.h | 34 ++ include/zenoh-pico/session/session.h | 42 ++- include/zenoh-pico/session/utils.h | 3 + include/zenoh-pico/system/link/raweth.h | 1 - src/api/api.c | 42 ++- src/net/filtering.c | 120 ++++++ src/net/primitives.c | 49 +++ src/protocol/definitions/declarations.c | 17 +- src/session/interest.c | 356 ++++++++++++++++++ src/session/resource.c | 6 + src/session/rx.c | 33 +- src/session/utils.c | 8 + zenohpico.pc | 2 +- 22 files changed, 755 insertions(+), 44 deletions(-) create mode 100644 include/zenoh-pico/net/filtering.h create mode 100644 include/zenoh-pico/session/interest.h create mode 100644 src/net/filtering.c create mode 100644 src/session/interest.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 2c32d9bb1..3df82dd66 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -133,6 +133,7 @@ set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature") set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature") set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature") set(Z_FEATURE_ATTACHMENT 1 CACHE STRING "Toggle attachment feature") +set(Z_FEATURE_INTEREST 0 CACHE STRING "Toggle interest feature") # Toggle to 1 when protocol changes are merged add_definition(Z_FEATURE_MULTI_THREAD=${Z_FEATURE_MULTI_THREAD}) add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION}) add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION}) @@ -140,6 +141,7 @@ add_definition(Z_FEATURE_QUERY=${Z_FEATURE_QUERY}) add_definition(Z_FEATURE_QUERYABLE=${Z_FEATURE_QUERYABLE}) add_definition(Z_FEATURE_RAWETH_TRANSPORT=${Z_FEATURE_RAWETH_TRANSPORT}) add_definition(Z_FEATURE_ATTACHMENT=${Z_FEATURE_ATTACHMENT}) +add_definition(Z_FEATURE_INTEREST=${Z_FEATURE_INTEREST}) add_compile_definitions("Z_BUILD_DEBUG=$") message(STATUS "Building with feature confing:\n\ * MULTI-THREAD: ${Z_FEATURE_MULTI_THREAD}\n\ @@ -148,6 +150,7 @@ message(STATUS "Building with feature confing:\n\ * QUERY: ${Z_FEATURE_QUERY}\n\ * QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\ * ATTACHMENT: ${Z_FEATURE_ATTACHMENT}\n\ +* INTEREST: ${Z_FEATURE_INTEREST}\n\ * RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}") # Print summary of CMAKE configurations diff --git a/GNUmakefile b/GNUmakefile index 89b7c9334..3160a168b 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -57,6 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1 Z_FEATURE_QUERY?=1 Z_FEATURE_QUERYABLE?=1 Z_FEATURE_ATTACHMENT?=1 +Z_FEATURE_INTEREST?=0 Z_FEATURE_RAWETH_TRANSPORT?=0 # zenoh-pico/ directory @@ -73,7 +74,7 @@ CROSSIMG_PREFIX=zenoh-pico_ # - ARM: old versions of dockcross/dockcross were creating some issues since they used an old GCC (4.8.3) which lacks (even using -std=gnu11) CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAKE_BUILD_TYPE=$(BUILD_TYPE) -DBUILD_TESTING=$(BUILD_TESTING) -DBUILD_MULTICAST=$(BUILD_MULTICAST)\ - -DZ_FEATURE_MULTI_THREAD=$(Z_FEATURE_MULTI_THREAD) \ + -DZ_FEATURE_MULTI_THREAD=$(Z_FEATURE_MULTI_THREAD) -DZ_FEATURE_INTEREST=$(Z_FEATURE_INTEREST) \ -DZ_FEATURE_PUBLICATION=$(Z_FEATURE_PUBLICATION) -DZ_FEATURE_SUBSCRIPTION=$(Z_FEATURE_SUBSCRIPTION) -DZ_FEATURE_QUERY=$(Z_FEATURE_QUERY) -DZ_FEATURE_QUERYABLE=$(Z_FEATURE_QUERYABLE)\ -DZ_FEATURE_RAWETH_TRANSPORT=$(Z_FEATURE_RAWETH_TRANSPORT) -DZ_FEATURE_ATTACHMENT=$(Z_FEATURE_ATTACHMENT) -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H. diff --git a/include/zenoh-pico/collections/refcount.h b/include/zenoh-pico/collections/refcount.h index c348d16b7..183d11e46 100644 --- a/include/zenoh-pico/collections/refcount.h +++ b/include/zenoh-pico/collections/refcount.h @@ -124,6 +124,10 @@ } \ return c; \ } \ + static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \ + dst->in = p->in; \ + _ZP_RC_OP_INCR_CNT \ + } \ static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \ return (left->in == right->in); \ } \ diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index 31d8ccf4b..585a48426 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -239,6 +239,13 @@ #define Z_FEATURE_ATTACHMENT 1 #endif +/** + * Enable interests. + */ +#ifndef Z_FEATURE_INTEREST +#define Z_FEATURE_INTEREST 0 +#endif + /*------------------ Compile-time configuration properties ------------------*/ /** * Default length for Zenoh ID. Maximum size is 16 bytes. diff --git a/include/zenoh-pico/net/filtering.h b/include/zenoh-pico/net/filtering.h new file mode 100644 index 000000000..36c17adc5 --- /dev/null +++ b/include/zenoh-pico/net/filtering.h @@ -0,0 +1,48 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#ifndef ZENOH_PICO_FILTERING_NETAPI_H +#define ZENOH_PICO_FILTERING_NETAPI_H + +#include + +#include "zenoh-pico/api/constants.h" +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/protocol/core.h" + +typedef enum { + WRITE_FILTER_INIT = 0, + WRITE_FILTER_ACTIVE = 1, + WRITE_FILTER_OFF = 2, +} _z_write_filter_state_t; + +typedef struct { + uint32_t decl_id; + uint8_t state; +} _z_writer_filter_ctx_t; + +/** + * Return type when declaring a queryable. + */ +typedef struct _z_interest_t { + uint32_t _interest_id; + _z_writer_filter_ctx_t *ctx; +} _z_write_filter_t; + +typedef struct _z_publisher_t _z_publisher_t; + +int8_t _z_write_filter_create(_z_publisher_t *pub); +int8_t _z_write_filter_destroy(const _z_publisher_t *pub); +_Bool _z_write_filter_active(const _z_publisher_t *pub); + +#endif /* ZENOH_PICO_FILTERING_NETAPI_H */ diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 20bb27099..1ebfbdcf2 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -241,4 +241,10 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, ); #endif +#if Z_FEATURE_INTEREST == 1 +uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, + void *arg); +int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id); +#endif + #endif /* INCLUDE_ZENOH_PICO_NET_PRIMITIVES_H */ diff --git a/include/zenoh-pico/net/publish.h b/include/zenoh-pico/net/publish.h index 75ebad8d1..adaff682a 100644 --- a/include/zenoh-pico/net/publish.h +++ b/include/zenoh-pico/net/publish.h @@ -15,18 +15,22 @@ #ifndef INCLUDE_ZENOH_PICO_NET_PUBLISH_H #define INCLUDE_ZENOH_PICO_NET_PUBLISH_H +#include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" /** * Return type when declaring a publisher. */ -typedef struct { +typedef struct _z_publisher_t { _z_keyexpr_t _key; _z_zint_t _id; _z_session_rc_t _zn; z_congestion_control_t _congestion_control; z_priority_t _priority; +#if Z_FEATURE_INTEREST == 1 + _z_write_filter_t _filter; +#endif } _z_publisher_t; #if Z_FEATURE_PUBLICATION == 1 diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index 241bfde77..8d68188b4 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -62,6 +62,11 @@ typedef struct _z_session_t { #if Z_FEATURE_QUERY == 1 _z_pending_query_list_t *_pending_queries; #endif + + // Session interests +#if Z_FEATURE_INTEREST == 1 + _z_session_interest_rc_list_t *_local_interests; +#endif } _z_session_t; extern void _z_session_clear(_z_session_t *zn); // Forward type declaration to avoid cyclical include diff --git a/include/zenoh-pico/protocol/definitions/declarations.h b/include/zenoh-pico/protocol/definitions/declarations.h index 8a3c8e597..6c3144e7b 100644 --- a/include/zenoh-pico/protocol/definitions/declarations.h +++ b/include/zenoh-pico/protocol/definitions/declarations.h @@ -137,8 +137,8 @@ _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keye _z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id); _z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); -_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id); +_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags); _z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key); -_z_declaration_t _z_make_final_decl(uint32_t id); +_z_declaration_t _z_make_final_interest(uint32_t id); #endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_DECLARATIONS_H */ diff --git a/include/zenoh-pico/session/interest.h b/include/zenoh-pico/session/interest.h new file mode 100644 index 000000000..116ac9385 --- /dev/null +++ b/include/zenoh-pico/session/interest.h @@ -0,0 +1,34 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#ifndef ZENOH_PICO_SESSION_INTEREST_H +#define ZENOH_PICO_SESSION_INTEREST_H + +#include + +#include "zenoh-pico/net/session.h" + +#if Z_FEATURE_INTEREST == 1 +_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id); +_z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr); +void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr); +void _z_flush_interest(_z_session_t *zn); +#endif // Z_FEATURE_INTEREST == 1 + +int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl); +int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id); +int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id); +int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags); + +#endif /* ZENOH_PICO_SESSION_INTEREST_H */ diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index a13b8a238..754f97ce0 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -77,9 +77,10 @@ typedef struct { _Bool _z_resource_eq(const _z_resource_t *one, const _z_resource_t *two); void _z_resource_clear(_z_resource_t *res); +void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src); void _z_resource_free(_z_resource_t **res); -_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy) _Z_LIST_DEFINE(_z_resource, _z_resource_t) /** @@ -102,7 +103,7 @@ void _z_subscription_clear(_z_subscription_t *sub); _Z_REFCOUNT_DEFINE(_z_subscription, _z_subscription) _Z_ELEM_DEFINE(_z_subscriber, _z_subscription_t, _z_noop_size, _z_subscription_clear, _z_noop_copy) -_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_noop_copy) +_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_subscription_rc_copy) _Z_LIST_DEFINE(_z_subscription_rc, _z_subscription_rc_t) typedef struct { @@ -187,4 +188,41 @@ typedef void (*_z_hello_handler_t)(_z_hello_t *hello, struct __z_hello_handler_w int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size); +typedef enum { + _Z_INTEREST_MSG_TYPE_FINAL = 0, + _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER, + _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE, + _Z_INTEREST_MSG_TYPE_DECL_TOKEN, + _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER, + _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE, + _Z_INTEREST_MSG_TYPE_UNDECL_TOKEN, +} _z_interest_msg_type_t; + +typedef struct _z_interest_msg_t { + uint8_t type; + uint32_t id; +} _z_interest_msg_t; + +/** + * The callback signature of the functions handling interest messages. + */ +typedef void (*_z_interest_handler_t)(const _z_interest_msg_t *msg, void *arg); + +typedef struct { + _z_keyexpr_t _key; + uint32_t _id; + _z_interest_handler_t _callback; + void *_arg; + uint8_t _flags; +} _z_session_interest_t; + +_Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two); +void _z_session_interest_clear(_z_session_interest_t *res); + +_Z_REFCOUNT_DEFINE(_z_session_interest, _z_session_interest) +_Z_ELEM_DEFINE(_z_session_interest, _z_session_interest_t, _z_noop_size, _z_session_interest_clear, _z_noop_copy) +_Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _z_session_interest_rc_drop, + _z_noop_copy) +_Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t) + #endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */ diff --git a/include/zenoh-pico/session/utils.h b/include/zenoh-pico/session/utils.h index 39491ba4d..4875bd82f 100644 --- a/include/zenoh-pico/session/utils.h +++ b/include/zenoh-pico/session/utils.h @@ -34,4 +34,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, ui int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability, z_congestion_control_t cong_ctrl); +void _zp_session_lock_mutex(_z_session_t *zn); +void _zp_session_unlock_mutex(_z_session_t *zn); + #endif /* INCLUDE_ZENOH_PICO_SESSION_UTILS_H */ diff --git a/include/zenoh-pico/system/link/raweth.h b/include/zenoh-pico/system/link/raweth.h index 6ce8ad67e..2996b1931 100644 --- a/include/zenoh-pico/system/link/raweth.h +++ b/include/zenoh-pico/system/link/raweth.h @@ -58,7 +58,6 @@ typedef struct { _Bool _has_vlan; } _z_raweth_socket_t; -int8_t _z_get_smac_raweth(_z_raweth_socket_t *resock); int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface); size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len); size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr); diff --git a/src/api/api.c b/src/api/api.c index 4bdcc3421..43afb1b39 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -23,6 +23,7 @@ #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/config.h" #include "zenoh-pico/net/config.h" +#include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" #include "zenoh-pico/net/memory.h" #include "zenoh-pico/net/primitives.h" @@ -694,14 +695,28 @@ z_owned_publisher_t z_declare_publisher(z_session_t zs, z_keyexpr_t keyexpr, con key = _z_rid_with_suffix(id, NULL); } } - + // Set options z_publisher_options_t opt = z_publisher_options_default(); if (options != NULL) { opt.congestion_control = options->congestion_control; opt.priority = options->priority; } - - return (z_owned_publisher_t){._value = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority)}; + // Set publisher + _z_publisher_t *pub = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority); + if (pub == NULL) { + if (key._id != Z_RESOURCE_ID_NONE) { + _z_undeclare_resource(&zs._val.in->val, key._id); + } + return (z_owned_publisher_t){._value = NULL}; + } + // Create write filter + if (_z_write_filter_create(pub) != _Z_RES_OK) { + if (key._id != Z_RESOURCE_ID_NONE) { + _z_undeclare_resource(&zs._val.in->val, key._id); + } + return (z_owned_publisher_t){._value = NULL}; + } + return (z_owned_publisher_t){._value = pub}; } int8_t z_undeclare_publisher(z_owned_publisher_t *pub) { @@ -728,8 +743,8 @@ z_publisher_delete_options_t z_publisher_delete_options_default(void) { int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t len, const z_publisher_put_options_t *options) { - int8_t ret = _Z_RES_OK; - + int8_t ret = 0; + // Build options z_publisher_put_options_t opt = z_publisher_put_options_default(); if (options != NULL) { opt.encoding = options->encoding; @@ -737,15 +752,17 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l opt.attachment = options->attachment; #endif } - - ret = _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT, - pub._val->_congestion_control, pub._val->_priority + // Check if write filter is active before writing + if (!_z_write_filter_active(pub._val)) { + // Write value + ret = _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT, + pub._val->_congestion_control, pub._val->_priority #if Z_FEATURE_ATTACHMENT == 1 - , - opt.attachment + , + opt.attachment #endif - ); - + ); + } // Trigger local subscriptions _z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len #if Z_FEATURE_ATTACHMENT == 1 @@ -753,7 +770,6 @@ int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t l opt.attachment #endif ); - return ret; } diff --git a/src/net/filtering.c b/src/net/filtering.c new file mode 100644 index 000000000..ae0a8d898 --- /dev/null +++ b/src/net/filtering.c @@ -0,0 +1,120 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/net/filtering.h" + +#include +#include +#include + +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/net/primitives.h" +#include "zenoh-pico/net/query.h" +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/queryable.h" +#include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_INTEREST == 1 +static void _z_write_filter_callback(const _z_interest_msg_t *msg, void *arg) { + _z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)arg; + + switch (ctx->state) { + // Update init state + case WRITE_FILTER_INIT: + switch (msg->type) { + case _Z_INTEREST_MSG_TYPE_FINAL: + ctx->state = WRITE_FILTER_ACTIVE; // No subscribers + break; + + case _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER: + ctx->state = WRITE_FILTER_OFF; + ctx->decl_id = msg->id; + break; + + default: // Nothing to do + break; + } + break; + // Remove filter if we receive a subscribe + case WRITE_FILTER_ACTIVE: + if (msg->type == _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER) { + ctx->state = WRITE_FILTER_OFF; + ctx->decl_id = msg->id; + } + break; + // Activate filter if subscribe is removed + case WRITE_FILTER_OFF: + if ((msg->type == _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER) && (ctx->decl_id == msg->id)) { + ctx->state = WRITE_FILTER_ACTIVE; + ctx->decl_id = 0; + } + break; + // Nothing to do + default: + break; + } +} + +int8_t _z_write_filter_create(_z_publisher_t *pub) { + uint8_t flags = _Z_INTEREST_FLAG_KEYEXPRS | _Z_INTEREST_FLAG_SUBSCRIBERS | _Z_INTEREST_FLAG_CURRENT | + _Z_INTEREST_FLAG_FUTURE | _Z_INTEREST_FLAG_AGGREGATE; + _z_writer_filter_ctx_t *ctx = (_z_writer_filter_ctx_t *)zp_malloc(sizeof(_z_writer_filter_ctx_t)); + + if (ctx == NULL) { + return _Z_ERR_SYSTEM_OUT_OF_MEMORY; + } + ctx->state = WRITE_FILTER_INIT; + ctx->decl_id = 0; + + pub->_filter.ctx = ctx; + pub->_filter._interest_id = _z_declare_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key), + _z_write_filter_callback, flags, (void *)ctx); + if (pub->_filter._interest_id == 0) { + zp_free(ctx); + return _Z_ERR_GENERIC; + } + return _Z_RES_OK; +} + +int8_t _z_write_filter_destroy(const _z_publisher_t *pub) { + _Z_RETURN_IF_ERR(_z_undeclare_interest(&pub->_zn.in->val, pub->_filter._interest_id)); + zp_free(pub->_filter.ctx); + return _Z_RES_OK; +} + +_Bool _z_write_filter_active(const _z_publisher_t *pub) { return (pub->_filter.ctx->state == WRITE_FILTER_ACTIVE); } + +#else +int8_t _z_write_filter_create(_z_publisher_t *pub) { + _ZP_UNUSED(pub); + return _Z_RES_OK; +} + +int8_t _z_write_filter_destroy(const _z_publisher_t *pub) { + _ZP_UNUSED(pub); + return _Z_RES_OK; +} + +_Bool _z_write_filter_active(const _z_publisher_t *pub) { + _ZP_UNUSED(pub); + return false; +} + +#endif diff --git a/src/net/primitives.c b/src/net/primitives.c index 4977acc32..87d138d47 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -20,12 +20,14 @@ #include "zenoh-pico/api/constants.h" #include "zenoh-pico/collections/bytes.h" #include "zenoh-pico/config.h" +#include "zenoh-pico/net/filtering.h" #include "zenoh-pico/net/logger.h" #include "zenoh-pico/net/memory.h" #include "zenoh-pico/protocol/core.h" #include "zenoh-pico/protocol/definitions/declarations.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/query.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/resource.h" @@ -119,6 +121,7 @@ int8_t _z_undeclare_publisher(_z_publisher_t *pub) { return _Z_ERR_ENTITY_UNKNOWN; } // Clear publisher + _z_write_filter_destroy(pub); _z_undeclare_resource(&pub->_zn.in->val, pub->_key._id); _z_session_rc_drop(&pub->_zn); return _Z_RES_OK; @@ -423,3 +426,49 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, return ret; } #endif + +#if Z_FEATURE_INTEREST == 1 +/*------------------ Interest Declaration ------------------*/ +uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags, + void *arg) { + _z_session_interest_t intr; + intr._id = _z_get_entity_id(zn); + intr._key = _z_get_expanded_key_from_key(zn, &keyexpr); + intr._flags = flags; + intr._callback = callback; + intr._arg = arg; + + // Create interest entry, stored at session-level, do not drop it by the end of this function. + _z_session_interest_rc_t *sintr = _z_register_interest(zn, &intr); + if (sintr == NULL) { + return 0; + } + // Build the declare message to send on the wire + _z_declaration_t declaration = _z_make_decl_interest(&keyexpr, intr._id, intr._flags); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_unregister_interest(zn, sintr); + return 0; + } + _z_n_msg_clear(&n_msg); + return intr._id; +} + +int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id) { + // Find interest entry + _z_session_interest_rc_t *sintr = _z_get_interest_by_id(zn, interest_id); + if (sintr == NULL) { + return _Z_ERR_ENTITY_UNKNOWN; + } + // Build the declare message to send on the wire + _z_declaration_t declaration = _z_make_undecl_interest(sintr->in->val._id, &sintr->in->val._key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + // Only if message is successfully send, session interest can be removed + _z_unregister_interest(zn, sintr); + return _Z_RES_OK; +} +#endif \ No newline at end of file diff --git a/src/protocol/definitions/declarations.c b/src/protocol/definitions/declarations.c index 712eff09b..9f1973077 100644 --- a/src/protocol/definitions/declarations.c +++ b/src/protocol/definitions/declarations.c @@ -102,18 +102,19 @@ _z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_ return (_z_declaration_t){._tag = _Z_UNDECL_TOKEN, ._body = {._undecl_token = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; } -_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { - return (_z_declaration_t){._tag = _Z_UNDECL_TOKEN, - ._body = {._undecl_token = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; -} -_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id) { - return (_z_declaration_t){._tag = _Z_DECL_TOKEN, - ._body = {._decl_token = { +_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags) { + return (_z_declaration_t){._tag = _Z_DECL_INTEREST, + ._body = {._decl_interest = { ._id = id, ._keyexpr = _z_keyexpr_steal(key), + .interest_flags = interest_flags, }}}; } -_z_declaration_t _z_make_final_decl(uint32_t id) { +_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t *key) { + return (_z_declaration_t){._tag = _Z_UNDECL_INTEREST, + ._body = {._undecl_interest = {._id = id, ._ext_keyexpr = _z_keyexpr_duplicate(*key)}}}; +} +_z_declaration_t _z_make_final_interest(uint32_t id) { return (_z_declaration_t){._tag = _Z_FINAL_INTEREST, ._body = {._final_interest = {._id = id}}}; } _z_decl_kexpr_t _z_decl_kexpr_null(void) { return (_z_decl_kexpr_t){0}; } diff --git a/src/session/interest.c b/src/session/interest.c new file mode 100644 index 000000000..219077b0f --- /dev/null +++ b/src/session/interest.c @@ -0,0 +1,356 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include +#include +#include + +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/net/query.h" +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/network.h" +#include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/queryable.h" +#include "zenoh-pico/session/resource.h" +#include "zenoh-pico/session/utils.h" +#include "zenoh-pico/utils/logging.h" + +#if Z_FEATURE_INTEREST == 1 +_Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two) { + return one->_id == two->_id; +} + +void _z_session_interest_clear(_z_session_interest_t *intr) { _z_keyexpr_clear(&intr->_key); } + +/*------------------ interest ------------------*/ +static _z_session_interest_rc_t *__z_get_interest_by_id(_z_session_interest_rc_list_t *intrs, const _z_zint_t id) { + _z_session_interest_rc_t *ret = NULL; + _z_session_interest_rc_list_t *xs = intrs; + while (xs != NULL) { + _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); + if (id == intr->in->val._id) { + ret = intr; + break; + } + xs = _z_session_interest_rc_list_tail(xs); + } + return ret; +} + +static _z_session_interest_rc_list_t *__z_get_interest_by_key_and_flags(_z_session_interest_rc_list_t *intrs, + uint8_t flags, const _z_keyexpr_t key) { + _z_session_interest_rc_list_t *ret = NULL; + _z_session_interest_rc_list_t *xs = intrs; + while (xs != NULL) { + _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); + if ((intr->in->val._flags & flags) == 0) { + continue; + } + if (_z_keyexpr_intersects(intr->in->val._key._suffix, strlen(intr->in->val._key._suffix), key._suffix, + strlen(key._suffix)) == true) { + ret = _z_session_interest_rc_list_push(ret, _z_session_interest_rc_clone_as_ptr(intr)); + } + xs = _z_session_interest_rc_list_tail(xs); + } + return ret; +} + +/** + * This function is unsafe because it operates in potentially concurrent data. + * Make sure that the following mutexes are locked before calling this function: + * - zn->_mutex_inner + */ +static _z_session_interest_rc_t *__unsafe_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id) { + _z_session_interest_rc_list_t *intrs = zn->_local_interests; + return __z_get_interest_by_id(intrs, id); +} + +/** + * This function is unsafe because it operates in potentially concurrent data. + * Make sure that the following mutexes are locked before calling this function: + * - zn->_mutex_inner + */ +static _z_session_interest_rc_list_t *__unsafe_z_get_interest_by_key_and_flags(_z_session_t *zn, uint8_t flags, + const _z_keyexpr_t key) { + _z_session_interest_rc_list_t *intrs = zn->_local_interests; + return __z_get_interest_by_key_and_flags(intrs, flags, key); +} + +static int8_t _z_send_resource_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_resource_list_t *res_list = _z_resource_list_clone(zn->_local_resources); + _zp_session_unlock_mutex(zn); + _z_resource_list_t *xs = res_list; + while (xs != NULL) { + _z_resource_t *res = _z_resource_list_head(xs); + // Build the declare message to send on the wire + _z_keyexpr_t key = _z_keyexpr_alias(res->_key); + _z_declaration_t declaration = _z_make_decl_keyexpr(res->_id, &key); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + xs = _z_resource_list_tail(xs); + } + _z_resource_list_free(&res_list); + return _Z_RES_OK; +} + +#if Z_FEATURE_SUBSCRIPTION == 1 +static int8_t _z_send_subscriber_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_subscription_rc_list_t *sub_list = _z_subscription_rc_list_clone(zn->_local_subscriptions); + _zp_session_unlock_mutex(zn); + _z_subscription_rc_list_t *xs = sub_list; + while (xs != NULL) { + _z_subscription_rc_t *sub = _z_subscription_rc_list_head(xs); + // Build the declare message to send on the wire + _z_keyexpr_t key = _z_keyexpr_alias(sub->in->val._key); + _z_declaration_t declaration = + _z_make_decl_subscriber(&key, sub->in->val._id, sub->in->val._info.reliability == Z_RELIABILITY_RELIABLE, + sub->in->val._info.mode == Z_SUBMODE_PULL); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + xs = _z_subscription_rc_list_tail(xs); + } + _z_subscription_rc_list_free(&sub_list); + return _Z_RES_OK; +} +#else +static int8_t _z_send_subscriber_interest(_z_session_t *zn) { + _ZP_UNUSED(zn); + return _Z_RES_OK; +} +#endif + +#if Z_FEATURE_QUERYABLE == 1 +static int8_t _z_send_queryable_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_session_queryable_rc_list_t *qle_list = _z_session_queryable_rc_list_clone(zn->_local_queryable); + _zp_session_unlock_mutex(zn); + _z_session_queryable_rc_list_t *xs = qle_list; + while (xs != NULL) { + _z_session_queryable_rc_t *qle = _z_session_queryable_rc_list_head(xs); + // Build the declare message to send on the wire + _z_keyexpr_t key = _z_keyexpr_alias(qle->in->val._key); + _z_declaration_t declaration = + _z_make_decl_queryable(&key, qle->in->val._id, qle->in->val._complete, _Z_QUERYABLE_DISTANCE_DEFAULT); + _z_network_message_t n_msg = _z_n_msg_make_declare(declaration); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + xs = _z_subscription_rc_list_tail(xs); + } + _z_session_queryable_rc_list_free(&qle_list); + return _Z_RES_OK; +} +#else +static int8_t _z_send_queryable_interest(_z_session_t *zn) { + _ZP_UNUSED(zn); + return _Z_RES_OK; +} +#endif + +static int8_t _z_interest_send_final_interest(_z_session_t *zn, uint32_t id) { + _z_declaration_t decl = _z_make_final_interest(id); + _z_network_message_t n_msg = _z_n_msg_make_declare(decl); + if (_z_send_n_msg(zn, &n_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + return _Z_ERR_TRANSPORT_TX_FAILED; + } + _z_n_msg_clear(&n_msg); + return _Z_RES_OK; +} + +_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id) { + _zp_session_lock_mutex(zn); + _z_session_interest_rc_t *intr = __unsafe_z_get_interest_by_id(zn, id); + _zp_session_unlock_mutex(zn); + return intr; +} + +_z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr) { + _Z_DEBUG(">>> Allocating interest for (%ju:%s)", (uintmax_t)intr->_key._id, intr->_key._suffix); + _z_session_interest_rc_t *ret = NULL; + + _zp_session_lock_mutex(zn); + ret = (_z_session_interest_rc_t *)zp_malloc(sizeof(_z_session_interest_rc_t)); + if (ret != NULL) { + *ret = _z_session_interest_rc_new_from_val(*intr); + zn->_local_interests = _z_session_interest_rc_list_push(zn->_local_interests, ret); + } + _zp_session_unlock_mutex(zn); + return ret; +} + +int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) { + const _z_keyexpr_t *decl_key = NULL; + _z_interest_msg_t msg; + uint8_t flags = 0; + switch (decl->_tag) { + case _Z_DECL_SUBSCRIBER: + msg.type = _Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER; + msg.id = decl->_body._decl_subscriber._id; + decl_key = &decl->_body._decl_subscriber._keyexpr; + flags = _Z_INTEREST_FLAG_SUBSCRIBERS; + break; + case _Z_DECL_QUERYABLE: + msg.type = _Z_INTEREST_MSG_TYPE_DECL_QUERYABLE; + msg.id = decl->_body._decl_queryable._id; + decl_key = &decl->_body._decl_queryable._keyexpr; + flags = _Z_INTEREST_FLAG_QUERYABLES; + break; + case _Z_UNDECL_SUBSCRIBER: + msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER; + msg.id = decl->_body._undecl_subscriber._id; + decl_key = &decl->_body._undecl_subscriber._ext_keyexpr; + flags = _Z_INTEREST_FLAG_SUBSCRIBERS; + break; + case _Z_UNDECL_QUERYABLE: + msg.type = _Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE; + msg.id = decl->_body._undecl_queryable._id; + decl_key = &decl->_body._undecl_queryable._ext_keyexpr; + flags = _Z_INTEREST_FLAG_QUERYABLES; + break; + default: + return _Z_ERR_MESSAGE_ZENOH_DECLARATION_UNKNOWN; + } + _zp_session_lock_mutex(zn); + _z_keyexpr_t key = __unsafe_z_get_expanded_key_from_key(zn, decl_key); + if (key._suffix == NULL) { + _z_keyexpr_clear(&key); + _zp_session_unlock_mutex(zn); + return _Z_ERR_KEYEXPR_UNKNOWN; + } + _z_session_interest_rc_list_t *intrs = __unsafe_z_get_interest_by_key_and_flags(zn, flags, key); + _zp_session_unlock_mutex(zn); + + // Parse session_interest list + _z_session_interest_rc_list_t *xs = intrs; + while (xs != NULL) { + _z_session_interest_rc_t *intr = _z_session_interest_rc_list_head(xs); + if (intr->in->val._callback != NULL) { + intr->in->val._callback(&msg, intr->in->val._arg); + } + xs = _z_session_interest_rc_list_tail(xs); + } + // Clean up + _z_keyexpr_clear(&key); + _z_session_interest_rc_list_free(&intrs); + return _Z_RES_OK; +} + +void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr) { + _zp_session_lock_mutex(zn); + zn->_local_interests = + _z_session_interest_rc_list_drop_filter(zn->_local_interests, _z_session_interest_rc_eq, intr); + _zp_session_unlock_mutex(zn); +} + +void _z_flush_interest(_z_session_t *zn) { + _zp_session_lock_mutex(zn); + _z_session_interest_rc_list_free(&zn->_local_interests); + _zp_session_unlock_mutex(zn); +} + +int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id) { + _z_interest_msg_t msg = {.type = _Z_INTEREST_MSG_TYPE_FINAL, .id = id}; + // Retrieve interest + _zp_session_lock_mutex(zn); + _z_session_interest_rc_t *intr = __unsafe_z_get_interest_by_id(zn, id); + _zp_session_unlock_mutex(zn); + // Trigger callback + if (intr->in->val._callback != NULL) { + intr->in->val._callback(&msg, intr->in->val._arg); + } + return _Z_RES_OK; +} + +int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { + _ZP_UNUSED(zn); + _ZP_UNUSED(id); + // Update future masks + return _Z_RES_OK; +} + +int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { + _ZP_UNUSED(key); + _ZP_UNUSED(id); + // Check transport type + if (zn->_tp._type == _Z_TRANSPORT_UNICAST_TYPE) { + return _Z_RES_OK; // Nothing to do on unicast + } + // Current flags process + if ((flags & _Z_INTEREST_FLAG_CURRENT) != 0) { + // Send all declare + if ((flags & _Z_INTEREST_FLAG_KEYEXPRS) != 0) { + _Z_DEBUG("Sending declare resources"); + _Z_RETURN_IF_ERR(_z_send_resource_interest(zn)); + } + if ((flags & _Z_INTEREST_FLAG_SUBSCRIBERS) != 0) { + _Z_DEBUG("Sending declare subscribers"); + _Z_RETURN_IF_ERR(_z_send_subscriber_interest(zn)); + } + if ((flags & _Z_INTEREST_FLAG_QUERYABLES) != 0) { + _Z_DEBUG("Sending declare queryables"); + _Z_RETURN_IF_ERR(_z_send_queryable_interest(zn)); + } + if ((flags & _Z_INTEREST_FLAG_TOKENS) != 0) { + // Zenoh pico doesn't support liveliness token for now + } + // Send final declare + _Z_RETURN_IF_ERR(_z_interest_send_final_interest(zn, id)); + } + return _Z_RES_OK; +} + +#else +int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl) { + _ZP_UNUSED(zn); + _ZP_UNUSED(decl); + return _Z_RES_OK; +} + +int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl) { + _ZP_UNUSED(zn); + _ZP_UNUSED(decl); + return _Z_RES_OK; +} + +int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id) { + _ZP_UNUSED(zn); + _ZP_UNUSED(id); + return _Z_RES_OK; +} + +int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id) { + _ZP_UNUSED(zn); + _ZP_UNUSED(id); + return _Z_RES_OK; +} + +int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags) { + _ZP_UNUSED(zn); + _ZP_UNUSED(key); + _ZP_UNUSED(id); + _ZP_UNUSED(flags); + return _Z_RES_OK; +} +#endif diff --git a/src/session/resource.c b/src/session/resource.c index 8841037cf..f2e1d796e 100644 --- a/src/session/resource.c +++ b/src/session/resource.c @@ -29,6 +29,12 @@ _Bool _z_resource_eq(const _z_resource_t *other, const _z_resource_t *this_) { r void _z_resource_clear(_z_resource_t *res) { _z_keyexpr_clear(&res->_key); } +void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src) { + _z_keyexpr_copy(&dst->_key, &src->_key); + dst->_id = src->_id; + dst->_refcount = src->_refcount; +} + void _z_resource_free(_z_resource_t **res) { _z_resource_t *ptr = *res; diff --git a/src/session/rx.c b/src/session/rx.c index 9b27a9fbd..087f21628 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -24,6 +24,7 @@ #include "zenoh-pico/protocol/definitions/message.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" +#include "zenoh-pico/session/interest.h" #include "zenoh-pico/session/push.h" #include "zenoh-pico/session/queryable.h" #include "zenoh-pico/session/reply.h" @@ -51,17 +52,28 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_UNDECL_KEXPR: { _z_unregister_resource(zn, decl._decl._body._undecl_kexpr._id, local_peer_id); } break; - case _Z_DECL_SUBSCRIBER: { - // TODO: add support or explicitly discard + case _Z_DECL_INTEREST: { + _z_interest_process_declare_interest(zn, decl._decl._body._decl_interest._keyexpr, + decl._decl._body._decl_interest._id, + decl._decl._body._decl_interest.interest_flags); } break; - case _Z_UNDECL_SUBSCRIBER: { - // TODO: add support or explicitly discard + case _Z_UNDECL_INTEREST: { + _z_interest_process_undeclare_interest(zn, decl._decl._body._undecl_interest._id); + } break; + case _Z_FINAL_INTEREST: { + _z_interest_process_final_interest(zn, decl._decl._body._final_interest._id); + } break; + case _Z_DECL_SUBSCRIBER: { + _z_interest_process_declares(zn, &decl._decl); } break; case _Z_DECL_QUERYABLE: { - // TODO: add support or explicitly discard + _z_interest_process_declares(zn, &decl._decl); + } break; + case _Z_UNDECL_SUBSCRIBER: { + _z_interest_process_declares(zn, &decl._decl); } break; case _Z_UNDECL_QUERYABLE: { - // TODO: add support or explicitly discard + _z_interest_process_declares(zn, &decl._decl); } break; case _Z_DECL_TOKEN: { // TODO: add support or explicitly discard @@ -69,15 +81,6 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint case _Z_UNDECL_TOKEN: { // TODO: add support or explicitly discard } break; - case _Z_DECL_INTEREST: { - // TODO: add support or explicitly discard - } break; - case _Z_FINAL_INTEREST: { - // TODO: add support or explicitly discard - } break; - case _Z_UNDECL_INTEREST: { - // TODO: add support or explicitly discard - } break; } } break; case _Z_N_PUSH: { diff --git a/src/session/utils.c b/src/session/utils.c index 8897411c7..cfeee4b1e 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -130,3 +130,11 @@ int8_t _z_session_close(_z_session_t *zn, uint8_t reason) { return ret; } + +#if Z_FEATURE_MULTI_THREAD == 1 +void _zp_session_lock_mutex(_z_session_t *zn) { (void)zp_mutex_lock(&zn->_mutex_inner); } +void _zp_session_unlock_mutex(_z_session_t *zn) { (void)zp_mutex_unlock(&zn->_mutex_inner); } +#else +void _zp_session_lock_mutex(_z_session_t *zn) { _ZP_UNUSED(zn); } +void _zp_session_unlock_mutex(_z_session_t *zn) { _ZP_UNUSED(zn); } +#endif \ No newline at end of file diff --git a/zenohpico.pc b/zenohpico.pc index 915f9cc6e..9f520a887 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.11.20240123dev +Version: 0.11.20240208dev Cflags: -I${prefix}/include Libs: -L${prefix}/lib -lzenohpico