-
Notifications
You must be signed in to change notification settings - Fork 59
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add publication cache and querying subscriber
- Loading branch information
Showing
11 changed files
with
893 additions
and
42 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// | ||
// Copyright (c) 2023 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
#include <stdio.h> | ||
#include <string.h> | ||
|
||
#include "zenoh.h" | ||
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) | ||
#include <windows.h> | ||
#define sleep(x) Sleep(x * 1000) | ||
#else | ||
#include <unistd.h> | ||
#endif | ||
|
||
int main(int argc, char **argv) { | ||
char *keyexpr = "demo/example/zenoh-c-pub"; | ||
char *value = "Pub from C!"; | ||
|
||
if (argc > 1) keyexpr = argv[1]; | ||
if (argc > 2) value = argv[2]; | ||
|
||
z_owned_config_t config = z_config_default(); | ||
if (argc > 3) { | ||
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) { | ||
printf( | ||
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " | ||
"JSON-serialized list of strings\n", | ||
argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY); | ||
exit(-1); | ||
} | ||
} | ||
|
||
if (zc_config_insert_json(z_loan(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) { | ||
printf("Unable to configure timestamps!\n"); | ||
exit(-1); | ||
} | ||
|
||
printf("Opening session...\n"); | ||
z_owned_session_t s = z_open(z_move(config)); | ||
if (!z_check(s)) { | ||
printf("Unable to open session!\n"); | ||
exit(-1); | ||
} | ||
|
||
ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default(); | ||
pub_cache_opts.history = 42; | ||
|
||
printf("Declaring publication cache on '%s'...\n", keyexpr); | ||
ze_owned_publication_cache_t pub_cache = | ||
ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts); | ||
if (!ze_publication_cache_check(&pub_cache)) { | ||
printf("Unable to declare publication cache for key expression!\n"); | ||
exit(-1); | ||
} | ||
|
||
char buf[256]; | ||
for (int idx = 0; 1; ++idx) { | ||
sleep(1); | ||
sprintf(buf, "[%4d] %s", idx, value); | ||
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf); | ||
z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL); | ||
} | ||
|
||
ze_close_publication_cache(z_move(pub_cache)); | ||
|
||
z_close(z_move(s)); | ||
return 0; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// | ||
// Copyright (c) 2022 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
#include <stdio.h> | ||
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__) | ||
#include <windows.h> | ||
#define sleep(x) Sleep(x * 1000) | ||
#else | ||
#include <unistd.h> | ||
#endif | ||
#include "zenoh.h" | ||
|
||
const char *kind_to_str(z_sample_kind_t kind); | ||
|
||
void data_handler(const z_sample_t *sample, void *arg) { | ||
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); | ||
printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr), | ||
(int)sample->payload.len, sample->payload.start); | ||
z_drop(z_move(keystr)); | ||
} | ||
|
||
int main(int argc, char **argv) { | ||
char *expr = "demo/example/**"; | ||
if (argc > 1) { | ||
expr = argv[1]; | ||
} | ||
|
||
z_owned_config_t config = z_config_default(); | ||
if (argc > 2) { | ||
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) { | ||
printf( | ||
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a " | ||
"JSON-serialized list of strings\n", | ||
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY); | ||
exit(-1); | ||
} | ||
} | ||
|
||
printf("Opening session...\n"); | ||
z_owned_session_t s = z_open(z_move(config)); | ||
if (!z_check(s)) { | ||
printf("Unable to open session!\n"); | ||
exit(-1); | ||
} | ||
|
||
z_owned_closure_sample_t callback = z_closure(data_handler); | ||
printf("Declaring querying subscriber on '%s'...\n", expr); | ||
ze_owned_querying_subscriber_t sub = | ||
ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL); | ||
if (!z_check(sub)) { | ||
printf("Unable to declare querying subscriber.\n"); | ||
exit(-1); | ||
} | ||
|
||
printf("Enter 'q' to quit...\n"); | ||
char c = 0; | ||
while (c != 'q') { | ||
c = getchar(); | ||
if (c == -1) { | ||
sleep(1); | ||
} | ||
} | ||
|
||
z_drop(z_move(sub)); | ||
z_close(z_move(s)); | ||
return 0; | ||
} | ||
|
||
const char *kind_to_str(z_sample_kind_t kind) { | ||
switch (kind) { | ||
case Z_SAMPLE_KIND_PUT: | ||
return "PUT"; | ||
case Z_SAMPLE_KIND_DELETE: | ||
return "DELETE"; | ||
default: | ||
return "UNKNOWN"; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.