Skip to content

Commit

Permalink
Add publication cache and querying subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Nov 15, 2023
1 parent 193fe45 commit a312380
Show file tree
Hide file tree
Showing 11 changed files with 886 additions and 42 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spin = "0.9.5"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] }
zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] }
zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" }
zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]}

[build-dependencies]
cbindgen = "0.24.3"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spin = "0.9.5"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] }
zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] }
zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" }
zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]}

[build-dependencies]
cbindgen = "0.24.3"
Expand Down
78 changes: 78 additions & 0 deletions examples/z_pub_cache.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#include <string.h>

#include "zenoh.h"
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif

int main(int argc, char **argv) {
char *keyexpr = "demo/example/zenoh-c-pub";
char *value = "Pub from C!";

if (argc > 1) keyexpr = argv[1];
if (argc > 2) value = argv[2];

z_owned_config_t config = z_config_default();
if (argc > 3) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

if (zc_config_insert_json(z_loan(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) {
printf("Unable to configure timestamps!\n");
exit(-1);
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = 42;

printf("Declaring publication cache on '%s'...\n", keyexpr);
ze_owned_publication_cache_t pub_cache =
ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts);
if (!ze_publication_cache_check(&pub_cache)) {
printf("Unable to declare publication cache for key expression!\n");
exit(-1);
}

char buf[256];
for (int idx = 0; 1; ++idx) {
sleep(1);
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL);
}

ze_close_publication_cache(z_move(pub_cache));

z_close(z_move(s));
return 0;
}
88 changes: 88 additions & 0 deletions examples/z_query_sub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

const char *kind_to_str(z_sample_kind_t kind);

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr),
(int)sample->payload.len, sample->payload.start);
z_drop(z_move(keystr));
}

int main(int argc, char **argv) {
char *expr = "demo/example/**";
if (argc > 1) {
expr = argv[1];
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring querying subscriber on '%s'...\n", expr);
ze_owned_querying_subscriber_t sub =
ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare querying subscriber.\n");
exit(-1);
}

printf("Enter 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
}
}

z_drop(z_move(sub));
z_close(z_move(s));
return 0;
}

const char *kind_to_str(z_sample_kind_t kind) {
switch (kind) {
case Z_SAMPLE_KIND_PUT:
return "PUT";
case Z_SAMPLE_KIND_DELETE:
return "DELETE";
default:
return "UNKNOWN";
}
}
175 changes: 175 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ typedef enum z_sample_kind_t {
Z_SAMPLE_KIND_PUT = 0,
Z_SAMPLE_KIND_DELETE = 1,
} z_sample_kind_t;
typedef enum zc_locality_t {
ZC_LOCALITY_ANY = 0,
ZC_LOCALITY_SESSION_LOCAL = 1,
ZC_LOCALITY_REMOTE = 2,
} zc_locality_t;
typedef enum zc_reply_keyexpr_t {
ZC_REPLY_KEYEXPR_ANY = 0,
ZC_REPLY_KEYEXPR_MATCHING_QUERY = 1,
} zc_reply_keyexpr_t;
/**
* An array of bytes.
*/
Expand Down Expand Up @@ -717,6 +726,68 @@ typedef struct zc_owned_shmbuf_t {
typedef struct zc_owned_shm_manager_t {
uintptr_t _0;
} zc_owned_shm_manager_t;
/**
* An owned zenoh publication_cache.
*
* Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`.
* The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`.
*
* Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*/
typedef struct ze_owned_publication_cache_t {
uintptr_t _0[1];
} ze_owned_publication_cache_t;
/**
* Options passed to the :c:func:`ze_declare_publication_cache` function.
*
* Members:
* queryable_prefix: the prefix used for queryable
* queryable_origin: the restriction for the matching queries that will be receive by this
* publication cache
* history: the the history size
* resources_limit: the limit number of cached resources
*/
typedef struct ze_publication_cache_options_t {
struct z_keyexpr_t queryable_prefix;
enum zc_locality_t queryable_origin;
uintptr_t history;
uintptr_t resources_limit;
} ze_publication_cache_options_t;
/**
* An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription.
*
* Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`.
* The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`.
*
* Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*/
typedef struct ze_owned_querying_subscriber_t {
uintptr_t _0[1];
} ze_owned_querying_subscriber_t;
/**
* Represents the set of options that can be applied to a querying subscriber,
* upon its declaration via :c:func:`ze_declare_querying_subscriber`.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
*/
typedef struct ze_querying_subscriber_options_t {
enum z_reliability_t reliability;
enum zc_locality_t allowed_origin;
struct z_keyexpr_t query_selector;
enum z_query_target_t query_target;
struct z_query_consolidation_t query_consolidation;
enum zc_reply_keyexpr_t query_accept_replies;
uint64_t query_timeout_ms;
} ze_querying_subscriber_options_t;
ZENOHC_API extern const unsigned int Z_ROUTER;
ZENOHC_API extern const unsigned int Z_PEER;
ZENOHC_API extern const unsigned int Z_CLIENT;
Expand Down Expand Up @@ -1800,6 +1871,7 @@ ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void);
* Destroys a liveliness token, notifying subscribers of its destruction.
*/
ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token);
ZENOHC_API enum zc_locality_t zc_locality_default(void);
/**
* Returns `false` if `payload` is the gravestone value.
*/
Expand Down Expand Up @@ -1872,6 +1944,7 @@ int8_t zc_put_owned(struct z_session_t session,
*/
ZENOHC_API
struct z_owned_reply_channel_t zc_reply_fifo_new(uintptr_t bound);
ZENOHC_API enum zc_reply_keyexpr_t zc_reply_keyexpr_default(void);
/**
* Creates a new non-blocking fifo channel, returned as a pair of closures.
*
Expand Down Expand Up @@ -1966,3 +2039,105 @@ ZENOHC_API uint8_t *zc_shmbuf_ptr(const struct zc_owned_shmbuf_t *buf);
ZENOHC_API
void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf,
uintptr_t len);
/**
* Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety.
*/
ZENOHC_API
int8_t ze_close_publication_cache(struct ze_owned_publication_cache_t *pub_cache);
/**
* Declares a publication cache.
*
* Parameters:
* session: the zenoh session.
* keyexpr: the key expression to publish.
* options: additional options for the publication_cache.
*
* Returns:
* A :c:type:`ze_owned_publication_cache_t`.
*
*
* Example:
* Declaring a publication cache `NULL` for the options:
*
* .. code-block:: C
*
* ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL);
*
* is equivalent to initializing and passing the default publication cache options:
*
* .. code-block:: C
*
* ze_publication_cache_options_t opts = ze_publication_cache_options_default();
* ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts);
*/
ZENOHC_API
struct ze_owned_publication_cache_t ze_declare_publication_cache(struct z_session_t session,
struct z_keyexpr_t keyexpr,
const struct ze_publication_cache_options_t *options);
/**
* Declares a querying subscriber for a given key expression.
*
* Parameters:
* session: The zenoh session.
* keyexpr: The key expression to subscribe.
* callback: The callback function that will be called each time a data matching the subscribed expression is received.
* opts: additional options for the querying subscriber.
*
* Returns:
* A :c:type:`ze_owned_subscriber_t`.
*
* To check if the subscription succeeded and if the querying subscriber is still valid,
* you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*
* Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* Example:
* Declaring a subscriber passing ``NULL`` for the options:
*
* .. code-block:: C
*
* ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL);
*
* is equivalent to initializing and passing the default subscriber options:
*
* .. code-block:: C
*
* z_subscriber_options_t opts = z_subscriber_options_default();
* ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
*/
ZENOHC_API
struct ze_owned_querying_subscriber_t ze_declare_querying_subscriber(struct z_session_t session,
struct z_keyexpr_t keyexpr,
struct z_owned_closure_sample_t *callback,
const struct ze_querying_subscriber_options_t *options);
/**
* Returns ``true`` if `pub_cache` is valid.
*/
ZENOHC_API bool ze_publication_cache_check(const struct ze_owned_publication_cache_t *pub_cache);
/**
* Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type
*/
ZENOHC_API struct ze_owned_publication_cache_t ze_publication_cache_null(void);
/**
* Constructs the default value for :c:type:`ze_publication_cache_options_t`.
*/
ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_default(void);
/**
* Returns ``true`` if `sub` is valid.
*/
ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub);
/**
* Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type
*/
ZENOHC_API struct ze_owned_querying_subscriber_t ze_querying_subscriber_null(void);
/**
* Constructs the default value for :c:type:`ze_querying_subscriber_options_t`.
*/
ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void);
/**
* Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety.
*/
ZENOHC_API
int8_t ze_undeclare_querying_subscriber(struct ze_owned_querying_subscriber_t *sub);
Loading

0 comments on commit a312380

Please sign in to comment.