Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unicast interest feature #349

Merged
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ccf2a25
refactor: rename make final interest function
jean-roland Jan 31, 2024
d15bc99
feat: add Interest config token
jean-roland Jan 31, 2024
461a8c1
fix: PA copy paste error
jean-roland Feb 2, 2024
33a8e3f
feat: add session interest type
jean-roland Feb 6, 2024
80707df
feat: add intrest type
jean-roland Feb 6, 2024
7e95df8
feat: add session interest module
jean-roland Feb 6, 2024
b29af1e
feat: add interest primitives
jean-roland Feb 6, 2024
bf7bdfc
feat: add session mutex function
jean-roland Feb 8, 2024
9682d98
feat: add interest message processing
jean-roland Feb 8, 2024
bf99b97
fix: modularize process declare interest function
jean-roland Feb 8, 2024
432a341
feat: implement trigger_interest function
jean-roland Feb 9, 2024
99fafd8
fix: clone entitiy lists before sending declarations
jean-roland Feb 9, 2024
979d486
fix: remove unused function
jean-roland Feb 13, 2024
0cd5ec4
fix: rename file
jean-roland Feb 13, 2024
d40a4be
fix: set interest_msg fields as public
jean-roland Feb 14, 2024
c056220
feat: process undeclares as well
jean-roland Feb 14, 2024
864a979
fix: remove unecessary enum value
jean-roland Feb 14, 2024
ab18413
fix: remove dropper function
jean-roland Feb 14, 2024
51dee17
feat: add write side filtering entity
jean-roland Feb 14, 2024
60b4fcb
feat: implement filtering
jean-roland Feb 14, 2024
3b73e07
fix: remove interest by default (for now)
jean-roland Feb 15, 2024
9c0be52
fix: rework publisher put actions order
jean-roland Feb 15, 2024
6f66d03
fix: add missing const
jean-roland Feb 15, 2024
46528e9
fix: add missing dummy functions
jean-roland Feb 15, 2024
6c6abf9
fix: exclude dummy functions from config token
jean-roland Feb 15, 2024
2add7ed
fix: remove unused defines
jean-roland Feb 15, 2024
e4385dc
fix: give keyexpr ownership to declare
jean-roland Feb 16, 2024
14d6310
feat: add refcount copy function
jean-roland Feb 16, 2024
4d04ac5
fix: duplicate key for declare
jean-roland Feb 16, 2024
0e3ea7c
feat: add resource copy function
jean-roland Feb 19, 2024
02bb3a7
fix: use keyexpr alias instead of duplicate
jean-roland Feb 19, 2024
dc64050
fix: flip early return condition
jean-roland Feb 19, 2024
ecdd57b
feat: send final interest
jean-roland Feb 19, 2024
18649c3
refactor: rename ambiguous functions
jean-roland Feb 19, 2024
68eca1f
fix: fuse identical functions
jean-roland Feb 20, 2024
ba254d5
fix: trigger local subs after push message
jean-roland Feb 27, 2024
de5c901
fix: add missing switch default label
jean-roland Feb 27, 2024
a6a9c14
build: add Z_FEATURE_INTEREST to build system
jean-roland Feb 27, 2024
679d1ac
fix: remove unused function
jean-roland Feb 27, 2024
0415cbf
refactor: regroup static functions
jean-roland Feb 27, 2024
2dd09f0
fix: add flag filtering on interest look-up
jean-roland Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/zenoh-pico/collections/refcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@
} \
return c; \
} \
static inline void name##_rc_copy(name##_rc_t *dst, const name##_rc_t *p) { \
dst->in = p->in; \
_ZP_RC_OP_INCR_CNT \
} \
static inline _Bool name##_rc_eq(const name##_rc_t *left, const name##_rc_t *right) { \
return (left->in == right->in); \
} \
Expand Down
7 changes: 7 additions & 0 deletions include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@
#define Z_FEATURE_ATTACHMENT 1
#endif

/**
* Enable interests.
*/
#ifndef Z_FEATURE_INTEREST
#define Z_FEATURE_INTEREST 0
#endif

/*------------------ Compile-time configuration properties ------------------*/
/**
* Default length for Zenoh ID. Maximum size is 16 bytes.
Expand Down
48 changes: 48 additions & 0 deletions include/zenoh-pico/net/filtering.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#ifndef ZENOH_PICO_FILTERING_NETAPI_H
#define ZENOH_PICO_FILTERING_NETAPI_H

#include <stdint.h>

Check warning

Code scanning / Cppcheck (reported by Codacy)

Include file: <stdint.h> not found. Please note: Cppcheck does not need standard library headers to get proper results. Warning

Include file: <stdint.h> not found. Please note: Cppcheck does not need standard library headers to get proper results.

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

typedef enum {
WRITE_FILTER_INIT = 0,
WRITE_FILTER_ACTIVE = 1,
WRITE_FILTER_OFF = 2,
} _z_write_filter_state_t;

typedef struct {
uint32_t decl_id;
uint8_t state;
} _z_writer_filter_ctx_t;

/**
* Return type when declaring a queryable.
*/
typedef struct _z_interest_t {
uint32_t _interest_id;
_z_writer_filter_ctx_t *ctx;
} _z_write_filter_t;

typedef struct _z_publisher_t _z_publisher_t;

int8_t _z_write_filter_create(_z_publisher_t *pub);
int8_t _z_write_filter_destroy(const _z_publisher_t *pub);
_Bool _z_write_filter_active(const _z_publisher_t *pub);

#endif /* ZENOH_PICO_FILTERING_NETAPI_H */
6 changes: 6 additions & 0 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,10 @@
);
#endif

#if Z_FEATURE_INTEREST == 1

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 2009 with no text in the supplied rule-texts-file Warning

misra violation 2009 with no text in the supplied rule-texts-file
uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id);
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_PRIMITIVES_H */
6 changes: 5 additions & 1 deletion include/zenoh-pico/net/publish.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,22 @@
#ifndef INCLUDE_ZENOH_PICO_NET_PUBLISH_H
#define INCLUDE_ZENOH_PICO_NET_PUBLISH_H

#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/session.h"
#include "zenoh-pico/protocol/core.h"

/**
* Return type when declaring a publisher.
*/
typedef struct {
typedef struct _z_publisher_t {
_z_keyexpr_t _key;
_z_zint_t _id;
_z_session_rc_t _zn;
z_congestion_control_t _congestion_control;
z_priority_t _priority;
#if Z_FEATURE_INTEREST == 1

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 2009 with no text in the supplied rule-texts-file Warning

misra violation 2009 with no text in the supplied rule-texts-file
_z_write_filter_t _filter;
#endif
} _z_publisher_t;

#if Z_FEATURE_PUBLICATION == 1
Expand Down
5 changes: 5 additions & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ typedef struct _z_session_t {
#if Z_FEATURE_QUERY == 1
_z_pending_query_list_t *_pending_queries;
#endif

// Session interests
#if Z_FEATURE_INTEREST == 1
_z_session_interest_rc_list_t *_local_interests;
#endif
} _z_session_t;

extern void _z_session_clear(_z_session_t *zn); // Forward type declaration to avoid cyclical include
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keye
_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags);
_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);
_z_declaration_t _z_make_final_decl(uint32_t id);
_z_declaration_t _z_make_final_interest(uint32_t id);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_DECLARATIONS_H */
35 changes: 35 additions & 0 deletions include/zenoh-pico/session/interest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_SESSION_INTEREST_H
#define ZENOH_PICO_SESSION_INTEREST_H

#include <stdbool.h>

Check warning

Code scanning / Cppcheck (reported by Codacy)

Include file: <stdbool.h> not found. Please note: Cppcheck does not need standard library headers to get proper results. Warning

Include file: <stdbool.h> not found. Please note: Cppcheck does not need standard library headers to get proper results.

#include "zenoh-pico/net/session.h"

#if Z_FEATURE_INTEREST == 1

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 2009 with no text in the supplied rule-texts-file Warning

misra violation 2009 with no text in the supplied rule-texts-file
_z_session_interest_rc_t *_z_get_interest_by_id(_z_session_t *zn, const _z_zint_t id);
_z_session_interest_rc_list_t *_z_get_interest_by_key(_z_session_t *zn, const _z_keyexpr_t key);
_z_session_interest_rc_t *_z_register_interest(_z_session_t *zn, _z_session_interest_t *intr);
void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr);
void _z_flush_interest(_z_session_t *zn);
#endif // Z_FEATURE_INTEREST == 1

int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);

#endif /* ZENOH_PICO_SESSION_INTEREST_H */
42 changes: 40 additions & 2 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ typedef struct {

_Bool _z_resource_eq(const _z_resource_t *one, const _z_resource_t *two);
void _z_resource_clear(_z_resource_t *res);
void _z_resource_copy(_z_resource_t *dst, const _z_resource_t *src);
void _z_resource_free(_z_resource_t **res);

_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_resource, _z_resource_t, _z_noop_size, _z_resource_clear, _z_resource_copy)
_Z_LIST_DEFINE(_z_resource, _z_resource_t)

/**
Expand All @@ -102,7 +103,7 @@ void _z_subscription_clear(_z_subscription_t *sub);

_Z_REFCOUNT_DEFINE(_z_subscription, _z_subscription)
_Z_ELEM_DEFINE(_z_subscriber, _z_subscription_t, _z_noop_size, _z_subscription_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_noop_copy)
_Z_ELEM_DEFINE(_z_subscription_rc, _z_subscription_rc_t, _z_noop_size, _z_subscription_rc_drop, _z_subscription_rc_copy)
_Z_LIST_DEFINE(_z_subscription_rc, _z_subscription_rc_t)

typedef struct {
Expand Down Expand Up @@ -187,4 +188,41 @@ typedef void (*_z_hello_handler_t)(_z_hello_t *hello, struct __z_hello_handler_w

int8_t _z_session_generate_zid(_z_id_t *bs, uint8_t size);

typedef enum {
_Z_INTEREST_MSG_TYPE_FINAL = 0,
_Z_INTEREST_MSG_TYPE_DECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_DECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_DECL_TOKEN,
_Z_INTEREST_MSG_TYPE_UNDECL_SUBSCRIBER,
_Z_INTEREST_MSG_TYPE_UNDECL_QUERYABLE,
_Z_INTEREST_MSG_TYPE_UNDECL_TOKEN,
} _z_interest_msg_type_t;

typedef struct _z_interest_msg_t {
uint8_t type;
uint32_t id;
} _z_interest_msg_t;

/**
* The callback signature of the functions handling interest messages.
*/
typedef void (*_z_interest_handler_t)(const _z_interest_msg_t *msg, void *arg);

typedef struct {
_z_keyexpr_t _key;
uint32_t _id;
_z_interest_handler_t _callback;
void *_arg;
uint8_t _flags;
} _z_session_interest_t;

_Bool _z_session_interest_eq(const _z_session_interest_t *one, const _z_session_interest_t *two);
void _z_session_interest_clear(_z_session_interest_t *res);

_Z_REFCOUNT_DEFINE(_z_session_interest, _z_session_interest)
_Z_ELEM_DEFINE(_z_session_interest, _z_session_interest_t, _z_noop_size, _z_session_interest_clear, _z_noop_copy)
_Z_ELEM_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t, _z_noop_size, _z_session_interest_rc_drop,
_z_noop_copy)
_Z_LIST_DEFINE(_z_session_interest_rc, _z_session_interest_rc_t)

#endif /* INCLUDE_ZENOH_PICO_SESSION_SESSION_H */
3 changes: 3 additions & 0 deletions include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *z_msg, ui
int8_t _z_send_n_msg(_z_session_t *zn, _z_network_message_t *n_msg, z_reliability_t reliability,
z_congestion_control_t cong_ctrl);

void _zp_session_lock_mutex(_z_session_t *zn);
void _zp_session_unlock_mutex(_z_session_t *zn);

#endif /* INCLUDE_ZENOH_PICO_SESSION_UTILS_H */
1 change: 0 additions & 1 deletion include/zenoh-pico/system/link/raweth.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ typedef struct {
_Bool _has_vlan;
} _z_raweth_socket_t;

int8_t _z_get_smac_raweth(_z_raweth_socket_t *resock);
int8_t _z_open_raweth(_z_sys_net_socket_t *sock, const char *interface);
size_t _z_send_raweth(const _z_sys_net_socket_t *sock, const void *buff, size_t buff_len);
size_t _z_receive_raweth(const _z_sys_net_socket_t *sock, void *buff, size_t buff_len, _z_bytes_t *addr);
Expand Down
47 changes: 31 additions & 16 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/net/config.h"
#include "zenoh-pico/net/filtering.h"
#include "zenoh-pico/net/logger.h"
#include "zenoh-pico/net/memory.h"
#include "zenoh-pico/net/primitives.h"
Expand Down Expand Up @@ -694,14 +695,28 @@
key = _z_rid_with_suffix(id, NULL);
}
}

// Set options
z_publisher_options_t opt = z_publisher_options_default();
if (options != NULL) {
opt.congestion_control = options->congestion_control;
opt.priority = options->priority;
}

return (z_owned_publisher_t){._value = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority)};
// Set publisher
_z_publisher_t *pub = _z_declare_publisher(&zs._val, key, opt.congestion_control, opt.priority);
if (pub == NULL) {
if (key._id != Z_RESOURCE_ID_NONE) {
_z_undeclare_resource(&zs._val.in->val, key._id);
}
return (z_owned_publisher_t){._value = NULL};

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 15.5 rule Note

MISRA 15.5 rule
}
// Create write filter
if (_z_write_filter_create(pub) != _Z_RES_OK) {
if (key._id != Z_RESOURCE_ID_NONE) {
_z_undeclare_resource(&zs._val.in->val, key._id);
}
return (z_owned_publisher_t){._value = NULL};

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 15.5 rule Note

MISRA 15.5 rule
}
return (z_owned_publisher_t){._value = pub};
}

int8_t z_undeclare_publisher(z_owned_publisher_t *pub) {
Expand All @@ -728,33 +743,33 @@

int8_t z_publisher_put(const z_publisher_t pub, const uint8_t *payload, size_t len,
const z_publisher_put_options_t *options) {
int8_t ret = _Z_RES_OK;

// Build options
z_publisher_put_options_t opt = z_publisher_put_options_default();
if (options != NULL) {
opt.encoding = options->encoding;
#if Z_FEATURE_ATTACHMENT == 1
opt.attachment = options->attachment;
#endif
}

ret = _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT,
pub._val->_congestion_control, pub._val->_priority
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
#endif
);

// Trigger local subscriptions
_z_trigger_local_subscriptions(&pub._val->_zn.in->val, pub._val->_key, payload, len
#if Z_FEATURE_ATTACHMENT == 1
,
opt.attachment
#endif
);

return ret;
// Check if write filter is active before writing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Push messages are now sent to the network after local subscriptions trigger. We tend to have the previous order on purpose.

if (_z_write_filter_active(pub._val)) {

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 14.4 rule Note

MISRA 14.4 rule
return _Z_RES_OK;

Check notice

Code scanning / Cppcheck (reported by Codacy)

MISRA 15.5 rule Note

MISRA 15.5 rule
}
// Write value
return _z_write(&pub._val->_zn.in->val, pub._val->_key, payload, len, opt.encoding, Z_SAMPLE_KIND_PUT,
pub._val->_congestion_control, pub._val->_priority
#if Z_FEATURE_ATTACHMENT == 1

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 2009 with no text in the supplied rule-texts-file Warning

misra violation 2009 with no text in the supplied rule-texts-file
,
opt.attachment
#endif
);
}

int8_t z_publisher_delete(const z_publisher_t pub, const z_publisher_delete_options_t *options) {
Expand Down
Loading
Loading