Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query/reply attachment #403

Merged
merged 20 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
d424efc
fix: add and call function to drop encoded attachments
jean-roland Apr 15, 2024
4557998
feat: add n arg for z_queryable example
jean-roland Apr 15, 2024
047bfaf
feat: add attachment to queries
jean-roland Apr 15, 2024
9dda5bc
feat: add query attachment to examples
jean-roland Apr 15, 2024
5a1286a
feat: add attachment to query replies
jean-roland Apr 15, 2024
3c5d91c
feat: add attachment to query reply examples
jean-roland Apr 15, 2024
e2efbf4
fix: compile error when attachment deactivated
jean-roland Apr 15, 2024
9026e6d
test: add attachment support in modularity test
jean-roland Apr 15, 2024
f102313
build: add a ci stage for no attachment case
jean-roland Apr 15, 2024
ee11c6b
feat: drop sample attachment with z_sample_drop
jean-roland Apr 24, 2024
801bf92
fix: init dst in z_bytes_copy
jean-roland Apr 24, 2024
b22ced6
fix: allocate attachment z_bytes to avoid going out of scope
jean-roland Apr 24, 2024
b21c1c8
chore: auto format python script
jean-roland Apr 25, 2024
4a5815d
fix: remove unused function
jean-roland Apr 25, 2024
18b428a
feat: add query attachment accessor
jean-roland Apr 25, 2024
dbe6ee6
fix: free attachment when query is dropped
jean-roland Apr 25, 2024
a7f0690
fix: drop sample attachment automatically
jean-roland Apr 25, 2024
7618d83
fix: set attachment_drop as a private function
jean-roland Apr 25, 2024
a41d2db
fix: replace private function _z_bytes_wrap
jean-roland Apr 25, 2024
8198aa8
fix: compilation error when attachment is off
jean-roland Apr 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion .github/workflows/build-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/modularity.py --pub $Z_FEATURE_PUBLICATION --sub $Z_FEATURE_SUBSCRIPTION --queryable $Z_FEATURE_QUERYABLE --query $Z_FEATURE_QUERY
python3 ./build/tests/modularity.py --pub $Z_FEATURE_PUBLICATION --sub $Z_FEATURE_SUBSCRIPTION --queryable $Z_FEATURE_QUERYABLE --query $Z_FEATURE_QUERY --attachment 1
jean-roland marked this conversation as resolved.
Show resolved Hide resolved
timeout-minutes: 5
env:
Z_FEATURE_PUBLICATION: ${{ matrix.feature_publication }}
Expand Down Expand Up @@ -202,3 +202,38 @@ jobs:
- name: Kill Zenoh router
if: always()
run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }}

no_attachment_test:
needs: zenoh_build
name: Build and test without attachment on ubuntu-latest
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Download Zenoh artifacts
uses: actions/download-artifact@v4
with:
name: ${{ needs.zenoh_build.outputs.artifact-name }}

- name: Unzip Zenoh artifacts
run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone

- id: run-zenoh
name: Run Zenoh router
run: |
RUST_LOG=debug ./zenoh-standalone/zenohd &
echo "zenohd-pid=$!" >> $GITHUB_OUTPUT

- name: Build project and run test
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja make
python3 ./build/tests/modularity.py --pub 1 --sub 1 --queryable 1 --query 1 --attachment 0
timeout-minutes: 5
env:
Z_FEATURE_ATTACHMENT: 0

- name: Kill Zenoh router
if: always()
run: kill ${{ steps.run-zenoh.outputs.zenohd-pid }}
23 changes: 23 additions & 0 deletions examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,26 @@ void reply_dropper(void *ctx) {
z_condvar_free(&cond);
}

#if Z_FEATURE_ATTACHMENT == 1
int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) {
(void)ctx;
printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)att_value.len, att_value.start);
return 0;
}
#endif

void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
#if Z_FEATURE_ATTACHMENT == 1
if (z_attachment_check(&sample.attachment)) {
printf("Attachement found\n");
z_attachment_iterate(sample.attachment, attachment_handler, NULL);
}
#endif
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
Expand Down Expand Up @@ -116,6 +130,11 @@ int main(int argc, char **argv) {
if (value != NULL) {
opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value));
}
#if Z_FEATURE_ATTACHMENT == 1
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hi"), z_bytes_from_str("there"));
opts.attachment = z_bytes_map_as_attachment(&map);
#endif
z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper);
if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) {
printf("Unable to send query.\n");
Expand All @@ -130,6 +149,10 @@ int main(int argc, char **argv) {

z_close(z_move(s));

#if Z_FEATURE_ATTACHMENT == 1
z_bytes_map_drop(&map);
#endif

return 0;
}
#else
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int main(int argc, char **argv) {
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);
#if Z_FEATURE_ATTACHMENT == 1
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, _z_bytes_wrap((uint8_t *)"hi", 2), _z_bytes_wrap((uint8_t *)"there", 5));
z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hi"), z_bytes_from_str("there"));
options.attachment = z_bytes_map_as_attachment(&map);
#endif
if (z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)value, strlen(value), &options) < 0) {
Expand Down
44 changes: 41 additions & 3 deletions examples/unix/c11/z_queryable.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,60 @@
#if Z_FEATURE_QUERYABLE == 1
const char *keyexpr = "demo/example/zenoh-pico-queryable";
const char *value = "Queryable from Pico!";
static int msg_nb = 0;

#if Z_FEATURE_ATTACHMENT == 1
int8_t attachment_handler(z_bytes_t key, z_bytes_t att_value, void *ctx) {
(void)ctx;
printf(">>> %.*s: %.*s\n", (int)key.len, key.start, (int)att_value.len, att_value.start);
return 0;
}
#endif

void query_handler(const z_query_t *query, void *ctx) {
(void)(ctx);
z_owned_str_t keystr = z_keyexpr_to_string(z_query_keyexpr(query));
z_bytes_t pred = z_query_parameters(query);
z_value_t payload_value = z_query_value(query);
printf(" >> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start);
printf(">> [Queryable handler] Received Query '%s?%.*s'\n", z_loan(keystr), (int)pred.len, pred.start);
if (payload_value.payload.len > 0) {
printf(" with value '%.*s'\n", (int)payload_value.payload.len, payload_value.payload.start);
}
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment = z_query_attachment(query);
if (z_attachment_check(&attachment)) {
printf("Attachement found\n");
z_attachment_iterate(attachment, attachment_handler, NULL);
}
#endif

z_query_reply_options_t options = z_query_reply_options_default();
options.encoding = z_encoding(Z_ENCODING_PREFIX_TEXT_PLAIN, NULL);

#if Z_FEATURE_ATTACHMENT == 1
// Add attachment
z_owned_bytes_map_t map = z_bytes_map_new();
z_bytes_map_insert_by_alias(&map, z_bytes_from_str("hello"), z_bytes_from_str("world"));
options.attachment = z_bytes_map_as_attachment(&map);
#endif

z_query_reply(query, z_keyexpr(keyexpr), (const unsigned char *)value, strlen(value), &options);
z_drop(z_move(keystr));
msg_nb++;

#if Z_FEATURE_ATTACHMENT == 1
z_bytes_map_drop(&map);
#endif
}

int main(int argc, char **argv) {
const char *mode = "client";
char *clocator = NULL;
char *llocator = NULL;
int n = 0;

int opt;
while ((opt = getopt(argc, argv, "k:e:m:v:l:")) != -1) {
while ((opt = getopt(argc, argv, "k:e:m:v:l:n:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
Expand All @@ -60,8 +91,12 @@ int main(int argc, char **argv) {
case 'v':
value = optarg;
break;
case 'n':
n = atoi(optarg);
break;
case '?':
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l') {
if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'v' || optopt == 'l' ||
optopt == 'n') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
Expand Down Expand Up @@ -111,6 +146,9 @@ int main(int argc, char **argv) {

printf("Press CTRL-C to quit...\n");
while (1) {
if ((n != 0) && (msg_nb >= n)) {
jean-roland marked this conversation as resolved.
Show resolved Hide resolved
break;
}
sleep(1);
}

Expand Down
18 changes: 16 additions & 2 deletions include/zenoh-pico/api/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,27 @@ z_bytes_t z_query_parameters(const z_query_t *query);
* Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release.
*
* Parameters:
* query: Pointer to the query to get the value selector from.
* query: Pointer to the query to get the payload from.
*
* Returns:
* Returns the payload value wrapped as a :c:type:`z_value_t`, since payload value is a user-defined representation.
* Returns the payload wrapped as a :c:type:`z_value_t`, since payload value is a user-defined representation.
*/
z_value_t z_query_value(const z_query_t *query);

#if Z_FEATURE_ATTACHMENT == 1
/**
* Get a query's attachment value by aliasing it.
* Note: This API has been marked as unstable: it works as advertised, but we may change it in a future release.
*
* Parameters:
* query: Pointer to the query to get the attachment from.
*
* Returns:
* Returns the attachment wrapped as a :c:type:`z_attachment_t`, since attachment is a user-defined representation.
*/
z_attachment_t z_query_attachment(const z_query_t *query);
#endif

/**
* Get a query's key by aliasing it.
*
Expand Down
6 changes: 2 additions & 4 deletions include/zenoh-pico/api/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ typedef struct {
*/
typedef struct {
z_encoding_t encoding;
#if Z_FEATURE_ATTACHMENT == 1
// TODO:ATT z_attachment_t attachment;
#endif
z_attachment_t attachment;
} z_query_reply_options_t;

/**
Expand Down Expand Up @@ -337,7 +335,7 @@ typedef struct {
z_query_target_t target;
uint32_t timeout_ms;
#if Z_FEATURE_ATTACHMENT == 1
// TODO:ATT z_attachment_t attachment;
z_attachment_t attachment;
#endif
} z_get_options_t;

Expand Down
4 changes: 3 additions & 1 deletion include/zenoh-pico/net/primitives.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,11 @@ int8_t _z_undeclare_queryable(_z_queryable_t *qle);
* query: The query to reply to. The caller keeps its ownership.
* key: The resource key of this reply. The caller keeps the ownership.
* payload: The value of this reply, the caller keeps ownership.
* kind: The type of operation.
* att: The optional attachment to the sample.
*/
int8_t _z_send_reply(const _z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload,
const z_sample_kind_t kind);
const z_sample_kind_t kind, z_attachment_t att);
#endif

#if Z_FEATURE_QUERY == 1
Expand Down
5 changes: 4 additions & 1 deletion include/zenoh-pico/net/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ typedef struct _z_query_t {
_z_keyexpr_t _key;
uint32_t _request_id;
_z_session_t *_zn;
#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t attachment;
#endif
char *_parameters;
_Bool _anyke;
} _z_query_t;
Expand All @@ -52,7 +55,7 @@ typedef struct {

#if Z_FEATURE_QUERYABLE == 1
_z_query_t _z_query_create(const _z_value_t *value, const _z_keyexpr_t *key, const _z_bytes_t *parameters,
_z_session_t *zn, uint32_t request_id);
_z_session_t *zn, uint32_t request_id, z_attachment_t att);
void _z_queryable_clear(_z_queryable_t *qbl);
void _z_queryable_free(_z_queryable_t **qbl);
#endif
Expand Down
23 changes: 15 additions & 8 deletions include/zenoh-pico/protocol/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ typedef struct {
uint64_t time;
} _z_timestamp_t;

#if Z_FEATURE_ATTACHMENT == 1
/**
* The body of a loop over an attachment's key-value pairs.
*
Expand Down Expand Up @@ -110,24 +109,32 @@ typedef struct z_attachment_t {
z_attachment_iter_driver_t iteration_driver;
} z_attachment_t;

z_attachment_t z_attachment_null(void);
_Bool z_attachment_check(const z_attachment_t *attachment);
int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx);
_z_bytes_t z_attachment_get(z_attachment_t this_, _z_bytes_t key);

typedef struct {
union {
z_attachment_t decoded;
_z_bytes_t encoded;
} body;
_Bool is_encoded;
} _z_owned_encoded_attachment_t;

z_attachment_t z_attachment_null(void);
z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att);

#if Z_FEATURE_ATTACHMENT == 1

_Bool z_attachment_check(const z_attachment_t *attachment);
int8_t z_attachment_iterate(z_attachment_t this_, z_attachment_iter_body_t body, void *ctx);
_z_bytes_t z_attachment_get(z_attachment_t this_, _z_bytes_t key);

/**
* Estimate the length of an attachment once encoded.
*/
size_t _z_attachment_estimate_length(z_attachment_t att);
z_attachment_t _z_encoded_as_attachment(const _z_owned_encoded_attachment_t *att);
void _z_encoded_attachment_drop(_z_owned_encoded_attachment_t *att);

/**
* Drop an attachment that was decoded from a received message
*/
void _z_attachment_drop(z_attachment_t *att);
#endif

_z_timestamp_t _z_timestamp_duplicate(const _z_timestamp_t *tstamp);
Expand Down
2 changes: 0 additions & 2 deletions include/zenoh-pico/protocol/definitions/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ typedef struct {
_z_source_info_t _ext_info;
_z_value_t _ext_value;
z_consolidation_mode_t _consolidation;
#if Z_FEATURE_ATTACHMENT == 1
_z_owned_encoded_attachment_t _ext_attachment;
#endif
} _z_msg_query_t;
typedef struct {
_Bool info;
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/session/queryable.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ _z_session_queryable_rc_t *_z_get_session_queryable_by_id(_z_session_t *zn, cons
_z_session_queryable_rc_list_t *_z_get_session_queryable_by_key(_z_session_t *zn, const _z_keyexpr_t key);

_z_session_queryable_rc_t *_z_register_session_queryable(_z_session_t *zn, _z_session_queryable_t *q);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid);
int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid,
z_attachment_t att);
void _z_unregister_session_queryable(_z_session_t *zn, _z_session_queryable_rc_t *q);
void _z_flush_session_queryable(_z_session_t *zn);
#endif
Expand Down
14 changes: 10 additions & 4 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ z_bytes_t z_query_parameters(const z_query_t *query) {

z_value_t z_query_value(const z_query_t *query) { return query->_val._rc.in->val._value; }

#if Z_FEATURE_ATTACHMENT == 1
z_attachment_t z_query_attachment(const z_query_t *query) { return query->_val._rc.in->val.attachment; }
#endif

z_keyexpr_t z_query_keyexpr(const z_query_t *query) { return query->_val._rc.in->val._key; }

_Bool z_value_is_initialized(z_value_t *value) {
Expand Down Expand Up @@ -834,7 +838,7 @@ z_get_options_t z_get_options_default(void) {
.target = z_query_target_default(), .consolidation = z_query_consolidation_default(),
.value = {.encoding = z_encoding_default(), .payload = _z_bytes_empty()},
#if Z_FEATURE_ATTACHMENT == 1
// TODO:ATT.attachment = z_attachment_null()
.attachment = z_attachment_null(),
#endif
.timeout_ms = Z_GET_TIMEOUT_DEFAULT
};
Expand Down Expand Up @@ -864,6 +868,9 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne
opt.consolidation = options->consolidation;
opt.target = options->target;
opt.value = options->value;
#if Z_FEATURE_ATTACHMENT == 1
opt.attachment = options->attachment;
#endif
}

if (opt.consolidation.mode == Z_CONSOLIDATION_MODE_AUTO) {
Expand All @@ -888,8 +895,7 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne
__z_reply_handler, wrapped_ctx, callback->drop, ctx, opt.timeout_ms
#if Z_FEATURE_ATTACHMENT == 1
,
z_attachment_null()
// TODO:ATT opt.attachment
opt.attachment
#endif
);
return ret;
Expand Down Expand Up @@ -969,7 +975,7 @@ int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const ui
.len = payload_len,
},
.encoding = {.id = opts.encoding.id, .schema = opts.encoding.schema}};
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT);
return _z_send_reply(&query->_val._rc.in->val, keyexpr, value, Z_SAMPLE_KIND_PUT, opts.attachment);
return _Z_ERR_GENERIC;
}
#endif
Expand Down
Loading
Loading