Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework channels for sample/query/reply #329

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ libc = "0.2.139"
log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
crossbeam-channel = "0.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ libc = "0.2.139"
log = "0.4.17"
rand = "0.8.5"
spin = "0.9.5"
crossbeam-channel = "0.5"
# shared-memory enabled for zenoh even if zenoh-c "shared-memory" feature is disabled. This is to make "std::mem::transmute" work for `ZSLice`
zenoh = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory", "unstable"], default-features = false }
zenoh-protocol = { version = "0.11.0-dev", git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "main", features = ["shared-memory"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ int main(int argc, char **argv) {
}

printf("Sending Query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = z_bytes_from_str(value);
Expand All @@ -76,4 +76,4 @@ int main(int argc, char **argv) {
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
}
2 changes: 1 addition & 1 deletion examples/z_get_liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int main(int argc, char **argv) {
}

printf("Sending liveliness query '%s'...\n", expr);
z_owned_reply_channel_t channel = zc_reply_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
zc_liveliness_get(z_loan(s), keyexpr, z_move(channel.send), NULL);
z_owned_reply_t reply = z_reply_null();
for (z_call(channel.recv, &reply); z_check(reply); z_call(channel.recv, &reply)) {
Expand Down
29 changes: 15 additions & 14 deletions examples/z_non_blocking_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,27 @@ int main(int argc, char **argv) {
printf("Sending Query '%s'...\n", expr);
z_get_options_t opts = z_get_options_default();
opts.target = Z_QUERY_TARGET_ALL;
z_owned_reply_channel_t channel = zc_reply_non_blocking_fifo_new(16);
z_owned_reply_fifo_channel_t channel = z_reply_fifo_channel_new(16);
z_get(z_loan(s), keyexpr, "", z_move(channel.send),
&opts); // here, the send is moved and will be dropped by zenoh when adequate
z_owned_reply_t reply = z_reply_null();
for (bool call_success = z_call(channel.recv, &reply); !call_success || z_check(reply);
call_success = z_call(channel.recv, &reply)) {
if (!call_success) {
continue;
}
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf("Received an error\n");
while (true) {
for (z_call(channel.try_recv, &reply); z_check(reply); z_call(channel.try_recv, &reply)) {
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
}
}
printf(">> Nothing to get... sleep for 5 s\n");
z_sleep_s(5);
}

z_drop(z_move(reply));
z_drop(z_move(channel));
z_close(z_move(s));
return 0;
}
}
44 changes: 22 additions & 22 deletions examples/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,30 @@ int main(int argc, char **argv) {
}

printf("Pull functionality not implemented!\n");
// @TODO: implement z_owned_sample_channel_t and z_sample_channel_ring_new
// printf("Declaring Subscriber on '%s'...\n", keyexpr);
// z_owned_sample_channel_t channel = z_sample_channel_ring_new(size);
// z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
// if (!z_check(sub)) {
// printf("Unable to declare subscriber.\n");
// return -1;
// }
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_ring_channel_t channel = z_sample_channel_ring_new(size);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
return -1;
}

// printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size);
// z_owned_sample_t sample = z_sample_null();
// while (true) {
// for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
// z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr));
// printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len,
// sample.payload.start);
// z_drop(z_move(keystr));
// z_drop(z_move(sample));
// }
// printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval);
// z_sleep_ms(interval);
// }
printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size);
z_owned_sample_t sample = z_sample_null();
while (true) {
for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr));
printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len,
sample.payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
}
printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval);
z_sleep_ms(interval);
}

// z_undeclare_subscriber(z_move(sub));
z_undeclare_subscriber(z_move(sub));
z_drop(z_move(channel));

z_close(z_move(s));

Expand Down
12 changes: 3 additions & 9 deletions examples/z_queryable_with_channels.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,13 @@
#include <stdio.h>
#include <string.h>
#include <zenoh_macros.h>

#include "zenoh.h"

const char *expr = "demo/example/zenoh-c-queryable";
const char *value = "Queryable from C!";
z_keyexpr_t keyexpr;

void query_handler(const z_query_t *query, void *context) {
z_owned_closure_owned_query_t *channel = (z_owned_closure_owned_query_t *)context;
z_owned_query_t oquery = z_query_clone(query);
z_call(*channel, &oquery);
}

int main(int argc, char **argv) {
if (argc > 1) {
expr = argv[1];
Expand Down Expand Up @@ -54,9 +49,8 @@ int main(int argc, char **argv) {
}

printf("Declaring Queryable on '%s'...\n", expr);
z_owned_query_channel_t channel = zc_query_fifo_new(16);
z_owned_closure_query_t callback = z_closure(query_handler, NULL, &channel.send);
z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(callback), NULL);
z_owned_query_fifo_channel_t channel = z_query_fifo_channel_new(16);
z_owned_queryable_t qable = z_declare_queryable(z_loan(s), keyexpr, z_move(channel.send), NULL);
if (!z_check(qable)) {
printf("Unable to create queryable.\n");
exit(-1);
Expand Down
Loading