Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add publication cache and querying subscriber #191

Merged
merged 9 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spin = "0.9.5"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] }
zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] }
zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" }
zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]}

[build-dependencies]
cbindgen = "0.24.3"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml.in
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ spin = "0.9.5"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory", "unstable" ] }
zenoh-protocol = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "shared-memory" ] }
zenoh-util = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master" }
zenoh-ext = { git = "https://github.com/eclipse-zenoh/zenoh.git", branch = "master", features = [ "unstable" ]}

[build-dependencies]
cbindgen = "0.24.3"
Expand Down
78 changes: 78 additions & 0 deletions examples/z_pub_cache.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#include <string.h>

#include "zenoh.h"
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif

int main(int argc, char **argv) {
char *keyexpr = "demo/example/zenoh-c-pub";
char *value = "Pub from C!";

if (argc > 1) keyexpr = argv[1];
if (argc > 2) value = argv[2];

z_owned_config_t config = z_config_default();
if (argc > 3) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_CONNECT_KEY, argv[3]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[3], Z_CONFIG_CONNECT_KEY, Z_CONFIG_CONNECT_KEY);
exit(-1);
}
}

if (zc_config_insert_json(z_loan(config), Z_CONFIG_ADD_TIMESTAMP_KEY, "true") < 0) {
printf("Unable to configure timestamps!\n");
exit(-1);
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

ze_publication_cache_options_t pub_cache_opts = ze_publication_cache_options_default();
pub_cache_opts.history = 42;

printf("Declaring publication cache on '%s'...\n", keyexpr);
ze_owned_publication_cache_t pub_cache =
ze_declare_publication_cache(z_loan(s), z_keyexpr(keyexpr), &pub_cache_opts);
if (!ze_publication_cache_check(&pub_cache)) {
printf("Unable to declare publication cache for key expression!\n");
exit(-1);
}

char buf[256];
for (int idx = 0; 1; ++idx) {
sleep(1);
sprintf(buf, "[%4d] %s", idx, value);
printf("Putting Data ('%s': '%s')...\n", keyexpr, buf);
z_put(z_loan(s), z_keyexpr(keyexpr), (const uint8_t *)buf, strlen(buf), NULL);
}

ze_close_publication_cache(z_move(pub_cache));

z_close(z_move(s));
return 0;
}
88 changes: 88 additions & 0 deletions examples/z_query_sub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#include <stdio.h>
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32) && !defined(__CYGWIN__)
#include <windows.h>
#define sleep(x) Sleep(x * 1000)
#else
#include <unistd.h>
#endif
#include "zenoh.h"

const char *kind_to_str(z_sample_kind_t kind);

void data_handler(const z_sample_t *sample, void *arg) {
z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr);
printf(">> [Subscriber] Received %s ('%s': '%.*s')\n", kind_to_str(sample->kind), z_loan(keystr),
(int)sample->payload.len, sample->payload.start);
z_drop(z_move(keystr));
}

int main(int argc, char **argv) {
char *expr = "demo/example/**";
if (argc > 1) {
expr = argv[1];
}

z_owned_config_t config = z_config_default();
if (argc > 2) {
if (zc_config_insert_json(z_loan(config), Z_CONFIG_LISTEN_KEY, argv[2]) < 0) {
printf(
"Couldn't insert value `%s` in configuration at `%s`. This is likely because `%s` expects a "
"JSON-serialized list of strings\n",
argv[2], Z_CONFIG_LISTEN_KEY, Z_CONFIG_LISTEN_KEY);
exit(-1);
}
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
exit(-1);
}

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring querying subscriber on '%s'...\n", expr);
ze_owned_querying_subscriber_t sub =
ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), z_move(callback), NULL);
if (!z_check(sub)) {
printf("Unable to declare querying subscriber.\n");
exit(-1);
}

printf("Enter 'q' to quit...\n");
char c = 0;
while (c != 'q') {
c = getchar();
if (c == -1) {
sleep(1);
}
}

z_drop(z_move(sub));
z_close(z_move(s));
return 0;
}

const char *kind_to_str(z_sample_kind_t kind) {
switch (kind) {
case Z_SAMPLE_KIND_PUT:
return "PUT";
case Z_SAMPLE_KIND_DELETE:
return "DELETE";
default:
return "UNKNOWN";
}
}
175 changes: 175 additions & 0 deletions include/zenoh_commons.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ typedef enum z_sample_kind_t {
Z_SAMPLE_KIND_PUT = 0,
Z_SAMPLE_KIND_DELETE = 1,
} z_sample_kind_t;
typedef enum zc_locality_t {
ZC_LOCALITY_ANY = 0,
ZC_LOCALITY_SESSION_LOCAL = 1,
ZC_LOCALITY_REMOTE = 2,
} zc_locality_t;
typedef enum zc_reply_keyexpr_t {
ZC_REPLY_KEYEXPR_ANY = 0,
ZC_REPLY_KEYEXPR_MATCHING_QUERY = 1,
} zc_reply_keyexpr_t;
/**
* An array of bytes.
*/
Expand Down Expand Up @@ -717,6 +726,68 @@ typedef struct zc_owned_shmbuf_t {
typedef struct zc_owned_shm_manager_t {
uintptr_t _0;
} zc_owned_shm_manager_t;
/**
* An owned zenoh publication_cache.
*
* Like most `z_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`.
* The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`.
*
* Like all `z_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*/
typedef struct ze_owned_publication_cache_t {
uintptr_t _0[1];
} ze_owned_publication_cache_t;
/**
* Options passed to the :c:func:`ze_declare_publication_cache` function.
*
* Members:
* queryable_prefix: the prefix used for queryable
* queryable_origin: the restriction for the matching queries that will be receive by this
* publication cache
* history: the the history size
* resources_limit: the limit number of cached resources
*/
typedef struct ze_publication_cache_options_t {
struct z_keyexpr_t queryable_prefix;
enum zc_locality_t queryable_origin;
uintptr_t history;
uintptr_t resources_limit;
} ze_publication_cache_options_t;
/**
* An owned zenoh querying subscriber. Destroying the subscriber cancels the subscription.
*
* Like most `ze_owned_X_t` types, you may obtain an instance of `z_X_t` by loaning it using `z_X_loan(&val)`.
* The `z_loan(val)` macro, available if your compiler supports C11's `_Generic`, is equivalent to writing `z_X_loan(&val)`.
*
* Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* To check if `val` is still valid, you may use `z_X_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*/
typedef struct ze_owned_querying_subscriber_t {
uintptr_t _0[1];
} ze_owned_querying_subscriber_t;
/**
* Represents the set of options that can be applied to a querying subscriber,
* upon its declaration via :c:func:`ze_declare_querying_subscriber`.
*
* Members:
* z_reliability_t reliability: The subscription reliability.
*/
typedef struct ze_querying_subscriber_options_t {
enum z_reliability_t reliability;
enum zc_locality_t allowed_origin;
struct z_keyexpr_t query_selector;
enum z_query_target_t query_target;
struct z_query_consolidation_t query_consolidation;
enum zc_reply_keyexpr_t query_accept_replies;
uint64_t query_timeout_ms;
} ze_querying_subscriber_options_t;
ZENOHC_API extern const unsigned int Z_ROUTER;
ZENOHC_API extern const unsigned int Z_PEER;
ZENOHC_API extern const unsigned int Z_CLIENT;
Expand Down Expand Up @@ -1800,6 +1871,7 @@ ZENOHC_API struct zc_owned_liveliness_token_t zc_liveliness_token_null(void);
* Destroys a liveliness token, notifying subscribers of its destruction.
*/
ZENOHC_API void zc_liveliness_undeclare_token(struct zc_owned_liveliness_token_t *token);
ZENOHC_API enum zc_locality_t zc_locality_default(void);
/**
* Returns `false` if `payload` is the gravestone value.
*/
Expand Down Expand Up @@ -1872,6 +1944,7 @@ int8_t zc_put_owned(struct z_session_t session,
*/
ZENOHC_API
struct z_owned_reply_channel_t zc_reply_fifo_new(uintptr_t bound);
ZENOHC_API enum zc_reply_keyexpr_t zc_reply_keyexpr_default(void);
/**
* Creates a new non-blocking fifo channel, returned as a pair of closures.
*
Expand Down Expand Up @@ -1966,3 +2039,105 @@ ZENOHC_API uint8_t *zc_shmbuf_ptr(const struct zc_owned_shmbuf_t *buf);
ZENOHC_API
void zc_shmbuf_set_length(const struct zc_owned_shmbuf_t *buf,
uintptr_t len);
/**
* Closes the given :c:type:`ze_owned_publication_cache_t`, droping it and invalidating it for double-drop safety.
*/
ZENOHC_API
int8_t ze_close_publication_cache(struct ze_owned_publication_cache_t *pub_cache);
/**
* Declares a publication cache.
*
* Parameters:
* session: the zenoh session.
* keyexpr: the key expression to publish.
* options: additional options for the publication_cache.
*
* Returns:
* A :c:type:`ze_owned_publication_cache_t`.
*
*
* Example:
* Declaring a publication cache `NULL` for the options:
*
* .. code-block:: C
*
* ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), NULL);
*
* is equivalent to initializing and passing the default publication cache options:
*
* .. code-block:: C
*
* ze_publication_cache_options_t opts = ze_publication_cache_options_default();
* ze_owned_publication_cache_t pub_cache = ze_declare_publication_cache(z_loan(s), z_keyexpr(expr), &opts);
*/
ZENOHC_API
struct ze_owned_publication_cache_t ze_declare_publication_cache(struct z_session_t session,
struct z_keyexpr_t keyexpr,
const struct ze_publication_cache_options_t *options);
/**
* Declares a querying subscriber for a given key expression.
*
* Parameters:
* session: The zenoh session.
* keyexpr: The key expression to subscribe.
* callback: The callback function that will be called each time a data matching the subscribed expression is received.
* opts: additional options for the querying subscriber.
*
* Returns:
* A :c:type:`ze_owned_subscriber_t`.
*
* To check if the subscription succeeded and if the querying subscriber is still valid,
* you may use `ze_querying_subscriber_check(&val)` or `z_check(val)` if your compiler supports `_Generic`, which will return `true` if `val` is valid.
*
* Like all `ze_owned_X_t`, an instance will be destroyed by any function which takes a mutable pointer to said instance, as this implies the instance's inners were moved.
* To make this fact more obvious when reading your code, consider using `z_move(val)` instead of `&val` as the argument.
* After a move, `val` will still exist, but will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your `val` is valid.
*
* Example:
* Declaring a subscriber passing ``NULL`` for the options:
*
* .. code-block:: C
*
* ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, NULL);
*
* is equivalent to initializing and passing the default subscriber options:
*
* .. code-block:: C
*
* z_subscriber_options_t opts = z_subscriber_options_default();
* ze_owned_subscriber_t sub = ze_declare_querying_subscriber(z_loan(s), z_keyexpr(expr), callback, &opts);
*/
ZENOHC_API
struct ze_owned_querying_subscriber_t ze_declare_querying_subscriber(struct z_session_t session,
struct z_keyexpr_t keyexpr,
struct z_owned_closure_sample_t *callback,
const struct ze_querying_subscriber_options_t *options);
/**
* Returns ``true`` if `pub_cache` is valid.
*/
ZENOHC_API bool ze_publication_cache_check(const struct ze_owned_publication_cache_t *pub_cache);
/**
* Constructs a null safe-to-drop value of 'ze_owned_publication_cache_t' type
*/
ZENOHC_API struct ze_owned_publication_cache_t ze_publication_cache_null(void);
/**
* Constructs the default value for :c:type:`ze_publication_cache_options_t`.
*/
ZENOHC_API struct ze_publication_cache_options_t ze_publication_cache_options_default(void);
/**
* Returns ``true`` if `sub` is valid.
*/
ZENOHC_API bool ze_querying_subscriber_check(const struct ze_owned_querying_subscriber_t *sub);
/**
* Constructs a null safe-to-drop value of 'ze_owned_querying_subscriber_t' type
*/
ZENOHC_API struct ze_owned_querying_subscriber_t ze_querying_subscriber_null(void);
/**
* Constructs the default value for :c:type:`ze_querying_subscriber_options_t`.
*/
ZENOHC_API struct ze_querying_subscriber_options_t ze_querying_subscriber_options_default(void);
/**
* Undeclares the given :c:type:`ze_owned_querying_subscriber_t`, droping it and invalidating it for double-drop safety.
*/
ZENOHC_API
int8_t ze_undeclare_querying_subscriber(struct ze_owned_querying_subscriber_t *sub);
Loading
Loading