From 0d76f92fb07839320b0b9f2fbe33248073442e1a Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 22 Mar 2024 18:17:36 +0100 Subject: [PATCH] Improve pull example --- examples/unix/c11/z_pull.c | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index 3abdd2ad5..2b194abf8 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -22,9 +22,11 @@ int main(int argc, char **argv) { const char *keyexpr = "demo/example/**"; char *locator = NULL; + size_t interval = 5000; + size_t size = 3; int opt; - while ((opt = getopt(argc, argv, "k:e:")) != -1) { + while ((opt = getopt(argc, argv, "k:e:i:s:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -32,8 +34,14 @@ int main(int argc, char **argv) { case 'e': locator = optarg; break; + case 'i': + interval = (size_t)atoi(optarg); + break; + case 's': + size = (size_t)atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'e') { + if (optopt == 'k' || optopt == 'e' || optopt == 'i' || optopt == 's') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -64,29 +72,25 @@ int main(int argc, char **argv) { } printf("Declaring Subscriber on '%s'...\n", keyexpr); - z_owned_sample_channel_t channel = z_sample_channel_ring_new(3); + z_owned_sample_channel_t channel = z_sample_channel_ring_new(size); z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); if (!z_check(sub)) { printf("Unable to declare subscriber.\n"); return -1; } - printf("Enter any key to pull data or 'q' to quit...\n"); - char c = '\0'; - for (int ret = scanf("%c", &c); c != 'q'; ret = scanf("%c", &c)) { - // Try to receive one sample from the ring channel - z_owned_sample_t sample = z_sample_null(); - z_call(channel.recv, &sample); - // Check if we actually received something - if (z_check(sample)) { + printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); + z_owned_sample_t sample = z_sample_null(); + while (true) { + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); z_drop(z_move(keystr)); - } else { - printf(">> [Subscriber] Nothing to pull...\n"); + z_drop(z_move(sample)); } - z_drop(z_move(sample)); + printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); + zp_sleep_ms(interval); } z_undeclare_subscriber(z_move(sub));