Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New interest format #405

Merged
merged 15 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature")
set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature")
set(Z_FEATURE_ATTACHMENT 1 CACHE STRING "Toggle attachment feature")
set(Z_FEATURE_INTEREST 0 CACHE STRING "Toggle interest feature") # Toggle to 1 when protocol changes are merged
set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interest feature")
add_definition(Z_FEATURE_MULTI_THREAD=${Z_FEATURE_MULTI_THREAD})
add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION})
add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION})
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1
Z_FEATURE_QUERY?=1
Z_FEATURE_QUERYABLE?=1
Z_FEATURE_ATTACHMENT?=1
Z_FEATURE_INTEREST?=0
Z_FEATURE_INTEREST?=1
Z_FEATURE_RAWETH_TRANSPORT?=0

# zenoh-pico/ directory
Expand Down
11 changes: 5 additions & 6 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,17 @@ int main(int argc, char **argv) {
z_close(z_session_move(&s));
return -1;
}

// Wait for joins in peer mode
if (strcmp(mode, "peer") == 0) {
printf("Waiting for joins...\n");
sleep(3);
}
printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
// Wait for joins in peer mode
if (strcmp(mode, "peer") == 0) {
printf("Waiting for joins...\n");
sleep(3);
}
printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx < n; ++idx) {
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/filtering.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef struct {
/**
* Return type when declaring a queryable.
*/
typedef struct _z_interest_t {
typedef struct _z_write_filter_t {
uint32_t _interest_id;
_z_writer_filter_ctx_t *ctx;
} _z_write_filter_t;
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters,
#endif

#if Z_FEATURE_INTEREST == 1
uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id);
uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
int8_t _z_remove_interest(_z_session_t *zn, uint32_t interest_id);
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_PRIMITIVES_H */
11 changes: 2 additions & 9 deletions include/zenoh-pico/protocol/codec/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
#define _Z_UNDECL_QUERYABLE_MID 5
#define _Z_DECL_TOKEN_MID 6
#define _Z_UNDECL_TOKEN_MID 7
#define _Z_DECL_INTEREST_MID 8
#define _Z_FINAL_INTEREST_MID 9
#define _Z_UNDECL_INTEREST_MID 10
#define _Z_DECL_FINAL_MID 0x1a

int8_t _z_decl_kexpr_encode(_z_wbuf_t *wbf, const _z_decl_kexpr_t *decl);
int8_t _z_decl_kexpr_decode(_z_decl_kexpr_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_undecl_kexpr_encode(_z_wbuf_t *wbf, const _z_undecl_kexpr_t *decl);
Expand All @@ -47,12 +46,6 @@ int8_t _z_decl_token_encode(_z_wbuf_t *wbf, const _z_decl_token_t *decl);
int8_t _z_decl_token_decode(_z_decl_token_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl);
int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl);
int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_final_interest_encode(_z_wbuf_t *wbf, const _z_final_interest_t *decl);
int8_t _z_final_interest_decode(_z_final_interest_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_undecl_interest_encode(_z_wbuf_t *wbf, const _z_undecl_interest_t *decl);
int8_t _z_undecl_interest_decode(_z_undecl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_declaration_encode(_z_wbuf_t *wbf, const _z_declaration_t *decl);
int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf);
Expand Down
24 changes: 24 additions & 0 deletions include/zenoh-pico/protocol/codec/interest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_INTEREST_H
#define INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_INTEREST_H

#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/protocol/iobuf.h"

int8_t _z_interest_encode(_z_wbuf_t *wbf, const _z_interest_t *interest, _Bool is_final);
int8_t _z_interest_decode(_z_interest_t *decl, _z_zbuf_t *zbf, _Bool is_final, _Bool has_ext);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_DECLARATIONS_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/codec/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ int8_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t
int8_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl);
int8_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest);
int8_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg);
int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf);
Expand Down
38 changes: 6 additions & 32 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,10 @@ typedef struct {
} _z_undecl_token_t;
_z_undecl_token_t _z_undecl_token_null(void);

#define _Z_INTEREST_FLAG_KEYEXPRS (1)
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
uint8_t interest_flags;
} _z_decl_interest_t;
_z_decl_interest_t _z_decl_interest_null(void);
typedef struct {
uint32_t _id;
} _z_final_interest_t;
_z_final_interest_t _z_final_interest_null(void);
typedef struct {
uint32_t _id;
_z_keyexpr_t _ext_keyexpr;
} _z_undecl_interest_t;
_z_undecl_interest_t _z_undecl_interest_null(void);
_Bool _placeholder; // In case we add extensions
} _z_decl_final_t;
_z_decl_final_t _z_decl_final_null(void);

typedef struct {
enum {
Expand All @@ -105,9 +85,7 @@ typedef struct {
_Z_UNDECL_QUERYABLE,
_Z_DECL_TOKEN,
_Z_UNDECL_TOKEN,
_Z_DECL_INTEREST,
_Z_FINAL_INTEREST,
_Z_UNDECL_INTEREST,
_Z_DECL_FINAL,
} _tag;
union {
_z_decl_kexpr_t _decl_kexpr;
Expand All @@ -118,9 +96,7 @@ typedef struct {
_z_undecl_queryable_t _undecl_queryable;
_z_decl_token_t _decl_token;
_z_undecl_token_t _undecl_token;
_z_decl_interest_t _decl_interest;
_z_final_interest_t _final_interest;
_z_undecl_interest_t _undecl_interest;
_z_decl_final_t _decl_final;
} _body;
} _z_declaration_t;
void _z_declaration_clear(_z_declaration_t* decl);
Expand All @@ -138,8 +114,6 @@ _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keye
_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags);
_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);
_z_declaration_t _z_make_final_interest(uint32_t id);
_z_declaration_t _z_make_decl_final(void);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_DECLARATIONS_H */
46 changes: 46 additions & 0 deletions include/zenoh-pico/protocol/definitions/interest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H
#define INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H

#include <stdint.h>

#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"

#define _Z_INTEREST_FLAG_KEYEXPRS (1)
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

#define _Z_INTEREST_NOT_FINAL_MASK (_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
uint8_t flags;
} _z_interest_t;
_z_interest_t _z_interest_null(void);

void _z_interest_clear(_z_interest_t* decl);

_z_interest_t _z_make_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t flags);
_z_interest_t _z_make_interest_final(uint32_t id);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H */
58 changes: 56 additions & 2 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/declarations.h"
#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/protocol/definitions/message.h"
#include "zenoh-pico/protocol/ext.h"
#include "zenoh-pico/protocol/keyexpr.h"
Expand All @@ -31,6 +32,7 @@
#define _Z_MID_N_REQUEST 0x1c
#define _Z_MID_N_RESPONSE 0x1b
#define _Z_MID_N_RESPONSE_FINAL 0x1a
#define _Z_MID_N_INTEREST 0x19

/*=============================*/
/* Network flags */
Expand All @@ -43,6 +45,13 @@
// - Z: Extension If Z==1 then Zenoh extensions are present
#define _Z_FLAG_N_DECLARE_I 0x20 // 1 << 5

// INTEREST message flags:
// - C: Current If C==1 then interest concerns current declarations
// - F: Future If F==1 then interest concerns future declarations
// - Z: Extension If Z==1 then Zenoh extensions are present
#define _Z_FLAG_N_INTEREST_CURRENT 0x20 // 1 << 5
#define _Z_FLAG_N_INTEREST_FUTURE 0x40 // 1 << 6

// PUSH message flags:
// N Named if N==1 then the key expr has name/suffix
// M Mapping if M==1 then keyexpr mapping is the one declared by the sender, otherwise by the receiver
Expand Down Expand Up @@ -210,23 +219,67 @@ typedef struct {
} _z_n_msg_response_t;
void _z_n_msg_response_clear(_z_n_msg_response_t *msg);

/*------------------ Declare Message ------------------*/

typedef struct {
_z_declaration_t _decl;
_z_timestamp_t _ext_timestamp;
_z_n_qos_t _ext_qos;
uint32_t _interest_id;
_Bool has_interest_id;
} _z_n_msg_declare_t;
static inline void _z_n_msg_declare_clear(_z_n_msg_declare_t *msg) { _z_declaration_clear(&msg->_decl); }

/*------------------ Interest Message ------------------*/

/// Flags:
/// - C: Current If C==1 then interest concerns current declarations
/// - F: Future If F==1 then interest concerns future declarations
/// - Z: Extension If Z==1 then Zenoh extensions are present
/// If C==0 and F==0, then interest is final
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|F|C|INTEREST |
/// +-+-+-+---------+
/// ~ id:z32 ~
/// +---------------+
/// |A|M|N|R|T|Q|S|K| (*) if interest is not final
/// +---------------+
/// ~ key_scope:z16 ~ if interest is not final && R==1
/// +---------------+
/// ~ key_suffix ~ if interest is not final && R==1 && N==1 -- <u8;z16>
/// +---------------+
/// ~ [int_exts] ~ if Z==1
/// +---------------+
///
/// (*) - if K==1 then the interest refers to key expressions
/// - if S==1 then the interest refers to subscribers
/// - if Q==1 then the interest refers to queryables
/// - if T==1 then the interest refers to tokens
/// - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions.
/// - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0.
/// - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver.
/// If R==0 then M should be set to 0.
/// - if A==1 then the replies SHOULD be aggregated
/// ```

typedef struct {
_z_interest_t _interest;
} _z_n_msg_interest_t;
static inline void _z_n_msg_interest_clear(_z_n_msg_interest_t *msg) { _z_interest_clear(&msg->_interest); }

/*------------------ Zenoh Message ------------------*/
typedef union {
_z_n_msg_declare_t _declare;
_z_n_msg_push_t _push;
_z_n_msg_request_t _request;
_z_n_msg_response_t _response;
_z_n_msg_response_final_t _response_final;
_z_n_msg_interest_t _interest;
} _z_network_body_t;
typedef struct {
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL } _tag;
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag;
_z_network_body_t _body;
} _z_network_message_t;
typedef _z_network_message_t _z_zenoh_message_t;
Expand All @@ -248,7 +301,8 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt
);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, _Bool has_interest_id, uint32_t interest_id);
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_NETWORK_H */
6 changes: 3 additions & 3 deletions include/zenoh-pico/session/interest.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr);
void _z_flush_interest(_z_session_t *zn);
int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);
int8_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_interest_final(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);

#endif /* ZENOH_PICO_SESSION_INTEREST_H */
6 changes: 3 additions & 3 deletions src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ int8_t _z_write_filter_create(_z_publisher_t *pub) {
ctx->decl_id = 0;

pub->_filter.ctx = ctx;
pub->_filter._interest_id = _z_declare_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key),
_z_write_filter_callback, flags, (void *)ctx);
pub->_filter._interest_id =
_z_add_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key), _z_write_filter_callback, flags, (void *)ctx);
if (pub->_filter._interest_id == 0) {
z_free(ctx);
return _Z_ERR_GENERIC;
Expand All @@ -94,7 +94,7 @@ int8_t _z_write_filter_create(_z_publisher_t *pub) {
}

int8_t _z_write_filter_destroy(const _z_publisher_t *pub) {
_Z_RETURN_IF_ERR(_z_undeclare_interest(&pub->_zn.in->val, pub->_filter._interest_id));
_Z_RETURN_IF_ERR(_z_remove_interest(&pub->_zn.in->val, pub->_filter._interest_id));
z_free(pub->_filter.ctx);
return _Z_RES_OK;
}
Expand Down
Loading
Loading