Skip to content

Commit

Permalink
New interest format (#405)
Browse files Browse the repository at this point in the history
* feat: implement interest protocol changes

* test: add interests to test msgcodec

* fix: bad struct name

* fix: bad interest generation and decode

* feat: add declare interest id

* feat: rework filter/interest with wire change

* fix: is final mask naming

* fix: remove stray char

* fix: wait for joins before starting publisher

* fix: encode declare_final header

* test: update raweth test

* fix: explicit decl_final_t members

* build: reactivate interests by default

* test: filter packets on raweth test

* doc: bad comment values
  • Loading branch information
jean-roland authored May 14, 2024
1 parent 2a767bc commit 0555baf
Show file tree
Hide file tree
Showing 24 changed files with 561 additions and 293 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ set(Z_FEATURE_QUERY 1 CACHE STRING "Toggle query feature")
set(Z_FEATURE_QUERYABLE 1 CACHE STRING "Toggle queryable feature")
set(Z_FEATURE_RAWETH_TRANSPORT 0 CACHE STRING "Toggle raw ethernet transport feature")
set(Z_FEATURE_ATTACHMENT 1 CACHE STRING "Toggle attachment feature")
set(Z_FEATURE_INTEREST 0 CACHE STRING "Toggle interest feature") # Toggle to 1 when protocol changes are merged
set(Z_FEATURE_INTEREST 1 CACHE STRING "Toggle interest feature")
add_definition(Z_FEATURE_MULTI_THREAD=${Z_FEATURE_MULTI_THREAD})
add_definition(Z_FEATURE_PUBLICATION=${Z_FEATURE_PUBLICATION})
add_definition(Z_FEATURE_SUBSCRIPTION=${Z_FEATURE_SUBSCRIPTION})
Expand Down
2 changes: 1 addition & 1 deletion GNUmakefile
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Z_FEATURE_SUBSCRIPTION?=1
Z_FEATURE_QUERY?=1
Z_FEATURE_QUERYABLE?=1
Z_FEATURE_ATTACHMENT?=1
Z_FEATURE_INTEREST?=0
Z_FEATURE_INTEREST?=1
Z_FEATURE_RAWETH_TRANSPORT?=0

# zenoh-pico/ directory
Expand Down
11 changes: 5 additions & 6 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,17 @@ int main(int argc, char **argv) {
z_close(z_session_move(&s));
return -1;
}

// Wait for joins in peer mode
if (strcmp(mode, "peer") == 0) {
printf("Waiting for joins...\n");
sleep(3);
}
printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
// Wait for joins in peer mode
if (strcmp(mode, "peer") == 0) {
printf("Waiting for joins...\n");
sleep(3);
}
printf("Press CTRL-C to quit...\n");
char buf[256];
for (int idx = 0; idx < n; ++idx) {
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/net/filtering.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ typedef struct {
/**
* Return type when declaring a queryable.
*/
typedef struct _z_interest_t {
typedef struct _z_write_filter_t {
uint32_t _interest_id;
_z_writer_filter_ctx_t *ctx;
} _z_write_filter_t;
Expand Down
6 changes: 3 additions & 3 deletions include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters,
#endif

#if Z_FEATURE_INTEREST == 1
uint32_t _z_declare_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
int8_t _z_undeclare_interest(_z_session_t *zn, uint32_t interest_id);
uint32_t _z_add_interest(_z_session_t *zn, _z_keyexpr_t keyexpr, _z_interest_handler_t callback, uint8_t flags,
void *arg);
int8_t _z_remove_interest(_z_session_t *zn, uint32_t interest_id);
#endif

#endif /* INCLUDE_ZENOH_PICO_NET_PRIMITIVES_H */
11 changes: 2 additions & 9 deletions include/zenoh-pico/protocol/codec/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
#define _Z_UNDECL_QUERYABLE_MID 5
#define _Z_DECL_TOKEN_MID 6
#define _Z_UNDECL_TOKEN_MID 7
#define _Z_DECL_INTEREST_MID 8
#define _Z_FINAL_INTEREST_MID 9
#define _Z_UNDECL_INTEREST_MID 10
#define _Z_DECL_FINAL_MID 0x1a

int8_t _z_decl_kexpr_encode(_z_wbuf_t *wbf, const _z_decl_kexpr_t *decl);
int8_t _z_decl_kexpr_decode(_z_decl_kexpr_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_undecl_kexpr_encode(_z_wbuf_t *wbf, const _z_undecl_kexpr_t *decl);
Expand All @@ -47,12 +46,6 @@ int8_t _z_decl_token_encode(_z_wbuf_t *wbf, const _z_decl_token_t *decl);
int8_t _z_decl_token_decode(_z_decl_token_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_undecl_token_encode(_z_wbuf_t *wbf, const _z_undecl_token_t *decl);
int8_t _z_undecl_token_decode(_z_undecl_token_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_decl_interest_encode(_z_wbuf_t *wbf, const _z_decl_interest_t *decl);
int8_t _z_decl_interest_decode(_z_decl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_final_interest_encode(_z_wbuf_t *wbf, const _z_final_interest_t *decl);
int8_t _z_final_interest_decode(_z_final_interest_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_undecl_interest_encode(_z_wbuf_t *wbf, const _z_undecl_interest_t *decl);
int8_t _z_undecl_interest_decode(_z_undecl_interest_t *decl, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_declaration_encode(_z_wbuf_t *wbf, const _z_declaration_t *decl);
int8_t _z_declaration_decode(_z_declaration_t *decl, _z_zbuf_t *zbf);
Expand Down
24 changes: 24 additions & 0 deletions include/zenoh-pico/protocol/codec/interest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_INTEREST_H
#define INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_INTEREST_H

#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/protocol/iobuf.h"

int8_t _z_interest_encode(_z_wbuf_t *wbf, const _z_interest_t *interest, _Bool is_final);
int8_t _z_interest_decode(_z_interest_t *decl, _z_zbuf_t *zbf, _Bool is_final, _Bool has_ext);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_CODEC_DECLARATIONS_H */
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/codec/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ int8_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t
int8_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl);
int8_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header);
int8_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest);
int8_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header);

int8_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg);
int8_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf);
Expand Down
38 changes: 6 additions & 32 deletions include/zenoh-pico/protocol/definitions/declarations.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,10 @@ typedef struct {
} _z_undecl_token_t;
_z_undecl_token_t _z_undecl_token_null(void);

#define _Z_INTEREST_FLAG_KEYEXPRS (1)
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
uint8_t interest_flags;
} _z_decl_interest_t;
_z_decl_interest_t _z_decl_interest_null(void);
typedef struct {
uint32_t _id;
} _z_final_interest_t;
_z_final_interest_t _z_final_interest_null(void);
typedef struct {
uint32_t _id;
_z_keyexpr_t _ext_keyexpr;
} _z_undecl_interest_t;
_z_undecl_interest_t _z_undecl_interest_null(void);
_Bool _placeholder; // In case we add extensions
} _z_decl_final_t;
_z_decl_final_t _z_decl_final_null(void);

typedef struct {
enum {
Expand All @@ -105,9 +85,7 @@ typedef struct {
_Z_UNDECL_QUERYABLE,
_Z_DECL_TOKEN,
_Z_UNDECL_TOKEN,
_Z_DECL_INTEREST,
_Z_FINAL_INTEREST,
_Z_UNDECL_INTEREST,
_Z_DECL_FINAL,
} _tag;
union {
_z_decl_kexpr_t _decl_kexpr;
Expand All @@ -118,9 +96,7 @@ typedef struct {
_z_undecl_queryable_t _undecl_queryable;
_z_decl_token_t _decl_token;
_z_undecl_token_t _undecl_token;
_z_decl_interest_t _decl_interest;
_z_final_interest_t _final_interest;
_z_undecl_interest_t _undecl_interest;
_z_decl_final_t _decl_final;
} _body;
} _z_declaration_t;
void _z_declaration_clear(_z_declaration_t* decl);
Expand All @@ -138,8 +114,6 @@ _z_declaration_t _z_make_undecl_queryable(uint32_t id, _Z_OPTIONAL const _z_keye
_z_declaration_t _z_make_decl_token(_Z_MOVE(_z_keyexpr_t) key, uint32_t id);
_z_declaration_t _z_make_undecl_token(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);

_z_declaration_t _z_make_decl_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t interest_flags);
_z_declaration_t _z_make_undecl_interest(uint32_t id, _Z_OPTIONAL const _z_keyexpr_t* key);
_z_declaration_t _z_make_final_interest(uint32_t id);
_z_declaration_t _z_make_decl_final(void);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_DECLARATIONS_H */
46 changes: 46 additions & 0 deletions include/zenoh-pico/protocol/definitions/interest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H
#define INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H

#include <stdint.h>

#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/keyexpr.h"

#define _Z_INTEREST_FLAG_KEYEXPRS (1)
#define _Z_INTEREST_FLAG_SUBSCRIBERS (1 << 1)
#define _Z_INTEREST_FLAG_QUERYABLES (1 << 2)
#define _Z_INTEREST_FLAG_TOKENS (1 << 3)
#define _Z_INTEREST_FLAG_RESTRICTED (1 << 4)
#define _Z_INTEREST_FLAG_CURRENT (1 << 5)
#define _Z_INTEREST_FLAG_FUTURE (1 << 6)
#define _Z_INTEREST_FLAG_AGGREGATE (1 << 7)

#define _Z_INTEREST_NOT_FINAL_MASK (_Z_INTEREST_FLAG_CURRENT | _Z_INTEREST_FLAG_FUTURE)

typedef struct {
_z_keyexpr_t _keyexpr;
uint32_t _id;
uint8_t flags;
} _z_interest_t;
_z_interest_t _z_interest_null(void);

void _z_interest_clear(_z_interest_t* decl);

_z_interest_t _z_make_interest(_Z_MOVE(_z_keyexpr_t) key, uint32_t id, uint8_t flags);
_z_interest_t _z_make_interest_final(uint32_t id);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_INTEREST_H */
58 changes: 56 additions & 2 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/collections/bytes.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/declarations.h"
#include "zenoh-pico/protocol/definitions/interest.h"
#include "zenoh-pico/protocol/definitions/message.h"
#include "zenoh-pico/protocol/ext.h"
#include "zenoh-pico/protocol/keyexpr.h"
Expand All @@ -31,6 +32,7 @@
#define _Z_MID_N_REQUEST 0x1c
#define _Z_MID_N_RESPONSE 0x1b
#define _Z_MID_N_RESPONSE_FINAL 0x1a
#define _Z_MID_N_INTEREST 0x19

/*=============================*/
/* Network flags */
Expand All @@ -43,6 +45,13 @@
// - Z: Extension If Z==1 then Zenoh extensions are present
#define _Z_FLAG_N_DECLARE_I 0x20 // 1 << 5

// INTEREST message flags:
// - C: Current If C==1 then interest concerns current declarations
// - F: Future If F==1 then interest concerns future declarations
// - Z: Extension If Z==1 then Zenoh extensions are present
#define _Z_FLAG_N_INTEREST_CURRENT 0x20 // 1 << 5
#define _Z_FLAG_N_INTEREST_FUTURE 0x40 // 1 << 6

// PUSH message flags:
// N Named if N==1 then the key expr has name/suffix
// M Mapping if M==1 then keyexpr mapping is the one declared by the sender, otherwise by the receiver
Expand Down Expand Up @@ -210,23 +219,67 @@ typedef struct {
} _z_n_msg_response_t;
void _z_n_msg_response_clear(_z_n_msg_response_t *msg);

/*------------------ Declare Message ------------------*/

typedef struct {
_z_declaration_t _decl;
_z_timestamp_t _ext_timestamp;
_z_n_qos_t _ext_qos;
uint32_t _interest_id;
_Bool has_interest_id;
} _z_n_msg_declare_t;
static inline void _z_n_msg_declare_clear(_z_n_msg_declare_t *msg) { _z_declaration_clear(&msg->_decl); }

/*------------------ Interest Message ------------------*/

/// Flags:
/// - C: Current If C==1 then interest concerns current declarations
/// - F: Future If F==1 then interest concerns future declarations
/// - Z: Extension If Z==1 then Zenoh extensions are present
/// If C==0 and F==0, then interest is final
///
/// 7 6 5 4 3 2 1 0
/// +-+-+-+-+-+-+-+-+
/// |Z|F|C|INTEREST |
/// +-+-+-+---------+
/// ~ id:z32 ~
/// +---------------+
/// |A|M|N|R|T|Q|S|K| (*) if interest is not final
/// +---------------+
/// ~ key_scope:z16 ~ if interest is not final && R==1
/// +---------------+
/// ~ key_suffix ~ if interest is not final && R==1 && N==1 -- <u8;z16>
/// +---------------+
/// ~ [int_exts] ~ if Z==1
/// +---------------+
///
/// (*) - if K==1 then the interest refers to key expressions
/// - if S==1 then the interest refers to subscribers
/// - if Q==1 then the interest refers to queryables
/// - if T==1 then the interest refers to tokens
/// - if R==1 then the interest is restricted to the matching key expression, else it is for all key expressions.
/// - if N==1 then the key expr has name/suffix. If R==0 then N should be set to 0.
/// - if M==1 then key expr mapping is the one declared by the sender, else it is the one declared by the receiver.
/// If R==0 then M should be set to 0.
/// - if A==1 then the replies SHOULD be aggregated
/// ```

typedef struct {
_z_interest_t _interest;
} _z_n_msg_interest_t;
static inline void _z_n_msg_interest_clear(_z_n_msg_interest_t *msg) { _z_interest_clear(&msg->_interest); }

/*------------------ Zenoh Message ------------------*/
typedef union {
_z_n_msg_declare_t _declare;
_z_n_msg_push_t _push;
_z_n_msg_request_t _request;
_z_n_msg_response_t _response;
_z_n_msg_response_final_t _response_final;
_z_n_msg_interest_t _interest;
} _z_network_body_t;
typedef struct {
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL } _tag;
enum { _Z_N_DECLARE, _Z_N_PUSH, _Z_N_REQUEST, _Z_N_RESPONSE, _Z_N_RESPONSE_FINAL, _Z_N_INTEREST } _tag;
_z_network_body_t _body;
} _z_network_message_t;
typedef _z_network_message_t _z_zenoh_message_t;
Expand All @@ -248,7 +301,8 @@ _z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_byt
);
_z_network_message_t _z_n_msg_make_reply(_z_zint_t rid, _Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_response_final(_z_zint_t rid);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration);
_z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, _Bool has_interest_id, uint32_t interest_id);
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);

#endif /* INCLUDE_ZENOH_PICO_PROTOCOL_DEFINITIONS_NETWORK_H */
6 changes: 3 additions & 3 deletions include/zenoh-pico/session/interest.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ void _z_unregister_interest(_z_session_t *zn, _z_session_interest_rc_t *intr);
void _z_flush_interest(_z_session_t *zn);
int8_t _z_interest_process_declares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_undeclares(_z_session_t *zn, const _z_declaration_t *decl);
int8_t _z_interest_process_final_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_undeclare_interest(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_declare_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);
int8_t _z_interest_process_declare_final(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_interest_final(_z_session_t *zn, uint32_t id);
int8_t _z_interest_process_interest(_z_session_t *zn, _z_keyexpr_t key, uint32_t id, uint8_t flags);

#endif /* ZENOH_PICO_SESSION_INTEREST_H */
6 changes: 3 additions & 3 deletions src/net/filtering.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ int8_t _z_write_filter_create(_z_publisher_t *pub) {
ctx->decl_id = 0;

pub->_filter.ctx = ctx;
pub->_filter._interest_id = _z_declare_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key),
_z_write_filter_callback, flags, (void *)ctx);
pub->_filter._interest_id =
_z_add_interest(&pub->_zn.in->val, _z_keyexpr_alias(pub->_key), _z_write_filter_callback, flags, (void *)ctx);
if (pub->_filter._interest_id == 0) {
z_free(ctx);
return _Z_ERR_GENERIC;
Expand All @@ -94,7 +94,7 @@ int8_t _z_write_filter_create(_z_publisher_t *pub) {
}

int8_t _z_write_filter_destroy(const _z_publisher_t *pub) {
_Z_RETURN_IF_ERR(_z_undeclare_interest(&pub->_zn.in->val, pub->_filter._interest_id));
_Z_RETURN_IF_ERR(_z_remove_interest(&pub->_zn.in->val, pub->_filter._interest_id));
z_free(pub->_filter.ctx);
return _Z_RES_OK;
}
Expand Down
Loading

0 comments on commit 0555baf

Please sign in to comment.