Skip to content

Commit

Permalink
Add unicast interest feature (#349)
Browse files Browse the repository at this point in the history
* refactor: rename make final interest function

* feat: add Interest config token

* fix: PA copy paste error

* feat: add session interest type

* feat: add intrest type

* feat: add session interest module

* feat: add interest primitives

* feat: add session mutex function

* feat: add interest message processing

* fix: modularize process declare interest function

* feat: implement trigger_interest function

* fix: clone entitiy lists before sending declarations

* fix: remove unused function

* fix: rename file

* fix: set interest_msg fields as public

* feat: process undeclares as well

* fix: remove unecessary enum value

* fix: remove dropper function

* feat: add write side filtering entity

* feat: implement filtering

* fix: remove interest by default (for now)

* fix: rework publisher put actions order

* fix: add missing const

* fix: add missing dummy functions

* fix: exclude dummy functions from config token

* fix: remove unused defines

* fix: give keyexpr ownership to declare

* feat: add refcount copy function

* fix: duplicate key for declare

* feat: add resource copy function

* fix: use keyexpr alias instead of duplicate

* fix: flip early return condition

* feat: send final interest

* refactor: rename ambiguous functions

* fix: fuse identical functions

* fix: trigger local subs after push message

* fix: add missing switch default label

* build: add Z_FEATURE_INTEREST to build system

* fix: remove unused function

* refactor: regroup static functions

* fix: add flag filtering on interest look-up
  • Loading branch information
jean-roland authored Mar 5, 2024
1 parent 0f7779a commit ebdb01e
Show file tree
Hide file tree
Showing 22 changed files with 755 additions and 44 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,15 @@ set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature")
set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature")
set(Z_FEATURE_ATTACHMENT 1 CACHE STRING "Toggle attachment feature")
set(Z_FEATURE_INTEREST 0 CACHE STRING "Toggle interest feature") # Toggle to 1 when protocol changes are merged
add_definition(Z_FEATURE_MULTI_THREAD=${Z_FEATURE_MULTI_THREAD})
add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION})
add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION})
add_definition(Z_FEATURE_QUERY=${Z_FEATURE_QUERY})
add_definition(Z_FEATURE_QUERYABLE=${Z_FEATURE_QUERYABLE})
add_definition(Z_FEATURE_RAWETH_TRANSPORT=${Z_FEATURE_RAWETH_TRANSPORT})
add_definition(Z_FEATURE_ATTACHMENT=${Z_FEATURE_ATTACHMENT})
add_definition(Z_FEATURE_INTEREST=${Z_FEATURE_INTEREST})
add_compile_definitions("Z_BUILD_DEBUG=$<CONFIG:Debug>")
message(STATUS "Building with feature confing:\n\
* MULTI-THREAD: ${Z_FEATURE_MULTI_THREAD}\n\
Expand All @@ -148,6 +150,7 @@ message(STATUS "Building with feature confing:\n\
* QUERY: ${Z_FEATURE_QUERY}\n\
* QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\
* ATTACHMENT: ${Z_FEATURE_ATTACHMENT}\n\
* INTEREST: ${Z_FEATURE_INTEREST}\n\
* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}")

# Print summary of CMAKE configurations
Expand Down
3 changes: 2 additions & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1
Z_FEATURE_QUERY?=1
Z_FEATURE_QUERYABLE?=1
Z_FEATURE_ATTACHMENT?=1
Z_FEATURE_INTEREST?=0
Z_FEATURE_RAWETH_TRANSPORT?=0

# zenoh-pico/ directory
Expand All @@ -73,7 +74,7 @@ CROSSIMG_PREFIX=zenoh-pico_
# - ARM: old versions of dockcross/dockcross were creating some issues since they used an old GCC (4.8.3) which lacks <stdatomic.h> (even using -std=gnu11)

CMAKE_OPT=-DZENOH_DEBUG=$(ZENOH_DEBUG) -DBUILD_EXAMPLES=$(BUILD_EXAMPLES) -DCMAKE_BUILD_TYPE=$(BUILD_TYPE) -DBUILD_TESTING=$(BUILD_TESTING) -DBUILD_MULTICAST=$(BUILD_MULTICAST)\
-DZ_FEATURE_MULTI_THREAD=$(Z_FEATURE_MULTI_THREAD) \
-DZ_FEATURE_MULTI_THREAD=$(Z_FEATURE_MULTI_THREAD) -DZ_FEATURE_INTEREST=$(Z_FEATURE_INTEREST) \
-DZ_FEATURE_PUBLICATION=$(Z_FEATURE_PUBLICATION) -DZ_FEATURE_SUBSCRIPTION=$(Z_FEATURE_SUBSCRIPTION) -DZ_FEATURE_QUERY=$(Z_FEATURE_QUERY) -DZ_FEATURE_QUERYABLE=$(Z_FEATURE_QUERYABLE)\
-DZ_FEATURE_RAWETH_TRANSPORT=$(Z_FEATURE_RAWETH_TRANSPORT) -DZ_FEATURE_ATTACHMENT=$(Z_FEATURE_ATTACHMENT) -DBUILD_INTEGRATION=$(BUILD_INTEGRATION) -DBUILD_TOOLS=$(BUILD_TOOLS) -DBUILD_SHARED_LIBS=$(BUILD_SHARED_LIBS) -H.

Expand Down
4 changes: 4 additions & 0 deletions include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@
} \
return c; \
} \
static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \
dst->in = p->in; \
_ZP_RC_OP_INCR_CNT \
} \
static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \
return (left->in == right->in); \
} \
Expand Down
7 changes: 7 additions & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@
#define Z_FEATURE_ATTACHMENT 1
#endif

/**
* Enable interests.
*/
#ifndef Z_FEATURE_INTEREST
#define Z_FEATURE_INTEREST 0
#endif

/*------------------ Compile-time configuration properties ------------------*/
/**
* Default length for Zenoh ID. Maximum size is 16 bytes.
Expand Down
48 changes: 48 additions & 0 deletions include/zenoh-pico/net/filtering.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#ifndef ZENOH_PICO_FILTERING_NETAPI_H
#define ZENOH_PICO_FILTERING_NETAPI_H

#include <stdint.h>

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

typedef enum {
WRITE_FILTER_INIT = 0,
WRITE_FILTER_ACTIVE = 1,
WRITE_FILTER_OFF = 2,
} _z_write_filter_state_t;

typedef struct {
uint32_t decl_id;
uint8_t state;
} _z_writer_filter_ctx_t;

/**
* Return type when declaring a queryable.
*/
typedef struct _z_interest_t {
uint32_t _interest_id;
_z_writer_filter_ctx_t *ctx;
} _z_write_filter_t;

typedef struct _z_publisher_t _z_publisher_t;

int8_t _z_write_filter_create(_z_publisher_t *pub);
int8_t _z_write_filter_destroy(const _z_publisher_t *pub);
_Bool _z_write_filter_active(const _z_publisher_t *pub);

#endif /* ZENOH_PICO_FILTERING_NETAPI_H */
6 changes: 6 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,10 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters,
);
#endif

#if Z_FEATURE_INTEREST == 1
uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id);
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_PRIMITIVES_H */
6 changes: 5 additions & 1 deletion include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
#ifndef INCLUDE_ZENOH_PICO_NET_PUBLISH_H
#define INCLUDE_ZENOH_PICO_NET_PUBLISH_H

#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

/**
* Return type when declaring a publisher.
*/
typedef struct {
typedef struct _z_publisher_t {
_z_keyexpr_t _key;
_z_zint_t _id;
_z_session_rc_t _zn;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
#if Z_FEATURE_INTEREST == 1
_z_write_filter_t _filter;
#endif
} _z_publisher_t;

#if Z_FEATURE_PUBLICATION == 1
Expand Down
5 changes: 5 additions & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ typedef struct _z_session_t {
#if Z_FEATURE_QUERY == 1
_z_pending_query_list_t *_pending_queries;
#endif

// Session interests
#if Z_FEATURE_INTEREST == 1
_z_session_interest_rc_list_t *_local_interests;
#endif
} _z_session_t;

extern void _z_session_clear(_z_session_t *zn); // Forward type declaration to avoid cyclical include
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keye
_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags);
_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);
_z_declaration_t _z_make_final_decl(uint32_t id);
_z_declaration_t _z_make_final_interest(uint32_t id);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_DECLARATIONS_H */
34 changes: 34 additions & 0 deletions include/zenoh-pico/session/interest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_SESSION_INTEREST_H
#define ZENOH_PICO_SESSION_INTEREST_H

#include <stdbool.h>

#include "zenoh-pico/net/session.h"

#if Z_FEATURE_INTEREST == 1
_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr);
void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr);
void _z_flush_interest(_z_session_t *zn);
#endif // Z_FEATURE_INTEREST == 1

int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);

#endif /* ZENOH_PICO_SESSION_INTEREST_H */
42 changes: 40 additions & 2 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ typedef struct {

_Bool _z_resource_eq(const _z_resource_t *one, const _z_resource_t *two);
void _z_resource_clear(_z_resource_t *res);
void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src);
void _z_resource_free(_z_resource_t **res);

_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy)
_Z_LIST_DEFINE(_z_resource, _z_resource_t)

/**
Expand All @@ -102,7 +103,7 @@ void _z_subscription_clear(_z_subscription_t *sub);

_Z_REFCOUNT_DEFINE(_z_subscription, _z_subscription)
_Z_ELEM_DEFINE(_z_subscriber, _z_subscription_t, _z_noop_size, _z_subscription_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_noop_copy)
_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_subscription_rc_copy)
_Z_LIST_DEFINE(_z_subscription_rc, _z_subscription_rc_t)

typedef struct {
Expand Down Expand Up @@ -187,4 +188,41 @@ typedef void (*_z_hello_handler_t)(_z_hello_t *hello, struct __z_hello_handler_w

int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size);

typedef enum {
_Z_INTEREST_MSG_TYPE_FINAL = 0,
_Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_DECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_DECL_TOKEN,
_Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_UNDECL_TOKEN,
} _z_interest_msg_type_t;

typedef struct _z_interest_msg_t {
uint8_t type;
uint32_t id;
} _z_interest_msg_t;

/**
* The callback signature of the functions handling interest messages.
*/
typedef void (*_z_interest_handler_t)(const _z_interest_msg_t *msg, void *arg);

typedef struct {
_z_keyexpr_t _key;
uint32_t _id;
_z_interest_handler_t _callback;
void *_arg;
uint8_t _flags;
} _z_session_interest_t;

_Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two);
void _z_session_interest_clear(_z_session_interest_t *res);

_Z_REFCOUNT_DEFINE(_z_session_interest, _z_session_interest)
_Z_ELEM_DEFINE(_z_session_interest, _z_session_interest_t, _z_noop_size, _z_session_interest_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _z_session_interest_rc_drop,
_z_noop_copy)
_Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t)

#endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */
3 changes: 3 additions & 0 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, ui
int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);

void _zp_session_lock_mutex(_z_session_t *zn);
void _zp_session_unlock_mutex(_z_session_t *zn);

#endif /* INCLUDE_ZENOH_PICO_SESSION_UTILS_H */
1 change: 0 additions & 1 deletion include/zenoh-pico/system/link/raweth.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ typedef struct {
_Bool _has_vlan;
} _z_raweth_socket_t;

int8_t _z_get_smac_raweth(_z_raweth_socket_t *resock);
int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface);
size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len);
size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr);
Expand Down
42 changes: 29 additions & 13 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/net/config.h"
#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/logger.h"
#include "zenoh-pico/net/memory.h"
#include "zenoh-pico/net/primitives.h"
Expand Down Expand Up @@ -694,14 +695,28 @@ z_owned_publisher_t z_declare_publisher(z_session_t zs, z_keyexpr_t keyexpr, con
key = _z_rid_with_suffix(id, NULL);
}
}

// Set options
z_publisher_options_t opt = z_publisher_options_default();
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
}

return (z_owned_publisher_t){._value = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority)};
// Set publisher
_z_publisher_t *pub = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority);
if (pub == NULL) {
if (key._id != Z_RESOURCE_ID_NONE) {
_z_undeclare_resource(&zs._val.in->val, key._id);
}
return (z_owned_publisher_t){._value = NULL};
}
// Create write filter
if (_z_write_filter_create(pub) != _Z_RES_OK) {
if (key._id != Z_RESOURCE_ID_NONE) {
_z_undeclare_resource(&zs._val.in->val, key._id);
}
return (z_owned_publisher_t){._value = NULL};
}
return (z_owned_publisher_t){._value = pub};
}

int8_t z_undeclare_publisher(z_owned_publisher_t *pub) {
Expand All @@ -728,32 +743,33 @@ z_publisher_delete_options_t z_publisher_delete_options_default(void) {

int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t len,
const z_publisher_put_options_t *options) {
int8_t ret = _Z_RES_OK;

int8_t ret = 0;
// Build options
z_publisher_put_options_t opt = z_publisher_put_options_default();
if (options != NULL) {
opt.encoding = options->encoding;
#if Z_FEATURE_ATTACHMENT == 1
opt.attachment = options->attachment;
#endif
}

ret = _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT,
pub._val->_congestion_control, pub._val->_priority
// Check if write filter is active before writing
if (!_z_write_filter_active(pub._val)) {
// Write value
ret = _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT,
pub._val->_congestion_control, pub._val->_priority
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
,
opt.attachment
#endif
);

);
}
// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
#endif
);

return ret;
}

Expand Down
Loading

0 comments on commit ebdb01e

Please sign in to comment.