From 25b0661fd3a9eadb0a121d8338895da45e1000e2 Mon Sep 17 00:00:00 2001 From: Luca Cominardi Date: Fri, 22 Mar 2024 16:53:05 +0100 Subject: [PATCH] Moved utils to handlers. Added FIFO handlers. Add z_sub_channel. --- examples/CMakeLists.txt | 5 +- examples/unix/c11/z_sub_channel.c | 99 ++++++++++ include/zenoh-pico.h | 2 +- .../zenoh-pico/api/{utils.h => handlers.h} | 95 +++------ src/api/handlers.c | 186 ++++++++++++++++++ src/api/utils.c | 64 ------ zenohpico.pc | 2 +- 7 files changed, 315 insertions(+), 138 deletions(-) create mode 100644 examples/unix/c11/z_sub_channel.c rename include/zenoh-pico/api/{utils.h => handlers.h} (52%) create mode 100644 src/api/handlers.c delete mode 100644 src/api/utils.c diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 05e76b29b..a70eb7691 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -24,6 +24,7 @@ if(UNIX) add_example(z_pub unix/c99/z_pub.c) add_example(z_pub_st unix/c99/z_pub_st.c) add_example(z_sub unix/c99/z_sub.c) + add_example(z_sub_channel unix/c99/z_sub_channel.c) add_example(z_sub_st unix/c99/z_sub_st.c) add_example(z_pull unix/c99/z_pull.c) add_example(z_get unix/c99/z_get.c) @@ -37,6 +38,7 @@ if(UNIX) add_example(z_pub unix/c11/z_pub.c) add_example(z_pub_st unix/c11/z_pub_st.c) add_example(z_sub unix/c11/z_sub.c) + add_example(z_sub_channel unix/c11/z_sub_channel.c) add_example(z_sub_st unix/c11/z_sub_st.c) add_example(z_pull unix/c11/z_pull.c) add_example(z_get unix/c11/z_get.c) @@ -53,8 +55,9 @@ elseif(MSVC) add_example(z_pub windows/z_pub.c) add_example(z_pub_st windows/z_pub_st.c) add_example(z_sub windows/z_sub.c) + add_example(z_sub windows/z_sub_channel.c) add_example(z_sub_st windows/z_sub_st.c) - add_example(z_pull windows/z_pull.c) + add_example(z_sub_channel windows/z_pull.c) add_example(z_get windows/z_get.c) add_example(z_queryable windows/z_queryable.c) add_example(z_info windows/z_info.c) diff --git a/examples/unix/c11/z_sub_channel.c b/examples/unix/c11/z_sub_channel.c new file mode 100644 index 000000000..005fc60e4 --- /dev/null +++ b/examples/unix/c11/z_sub_channel.c @@ -0,0 +1,99 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_SUBSCRIPTION == 1 +int main(int argc, char **argv) { + const char *keyexpr = "demo/example/**"; + char *locator = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + locator = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config = z_config_default(); + if (locator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(locator)); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_sample_channel_t channel = z_sample_channel_fifo_new(3); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } + + z_owned_sample_t sample = z_sample_null(); + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr)); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, + sample.payload.start); + z_drop(z_move(keystr)); + z_drop(z_move(sample)); + sample = z_sample_null(); + } + + z_undeclare_subscriber(z_move(sub)); + + // Stop read and lease tasks for zenoh-pico + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + + z_close(z_move(s)); + + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico.h b/include/zenoh-pico.h index bf3feab68..4f9ebb636 100644 --- a/include/zenoh-pico.h +++ b/include/zenoh-pico.h @@ -22,9 +22,9 @@ #define ZENOH_PICO_TWEAK 0 #include "zenoh-pico/api/constants.h" +#include "zenoh-pico/api/handlers.h" #include "zenoh-pico/api/macros.h" #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" -#include "zenoh-pico/api/utils.h" #endif /* ZENOH_PICO_H */ diff --git a/include/zenoh-pico/api/utils.h b/include/zenoh-pico/api/handlers.h similarity index 52% rename from include/zenoh-pico/api/utils.h rename to include/zenoh-pico/api/handlers.h index 36325fbf8..928a9fa59 100644 --- a/include/zenoh-pico/api/utils.h +++ b/include/zenoh-pico/api/handlers.h @@ -11,8 +11,8 @@ // Contributors: // ZettaScale Zenoh Team, // -#ifndef INCLUDE_ZENOH_PICO_API_UTILS_H -#define INCLUDE_ZENOH_PICO_API_UTILS_H +#ifndef INCLUDE_ZENOH_PICO_API_HANDLERS_H +#define INCLUDE_ZENOH_PICO_API_HANDLERS_H #include #include @@ -20,9 +20,12 @@ #include "zenoh-pico/api/macros.h" #include "zenoh-pico/api/types.h" #include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/fifo.h" #include "zenoh-pico/collections/ring.h" #include "zenoh-pico/system/platform.h" +// -- Owned sample +// @TODO: define it via dedicated macros and move it to the appropriate place in types. typedef struct { z_owned_keyexpr_t keyexpr; z_bytes_t payload; @@ -68,6 +71,8 @@ static inline void z_sample_drop(z_owned_sample_t *s) { } } +_Z_ELEM_DEFINE(_z_owned_sample, z_owned_sample_t, _z_noop_size, z_sample_drop, _z_noop_copy) + // -- Channel typedef int8_t (*_z_owned_sample_handler_t)(z_owned_sample_t *dst, void *context); @@ -82,21 +87,10 @@ typedef struct { z_owned_closure_owned_sample_t recv; } z_owned_sample_channel_t; -static inline z_owned_sample_channel_t z_owned_sample_channel_null() { - z_owned_sample_channel_t ch = {.send = NULL, .recv = NULL}; - return ch; -} - -static inline int8_t z_closure_owned_sample_call(z_owned_closure_owned_sample_t *recv, z_owned_sample_t *dst) { - int8_t res = _Z_ERR_GENERIC; - if (recv != NULL) { - res = (recv->call)(dst, recv->context); - } - return res; -} +z_owned_sample_channel_t z_owned_sample_channel_null(); +int8_t z_closure_owned_sample_call(z_owned_closure_owned_sample_t *recv, z_owned_sample_t *dst); // -- Ring -_Z_ELEM_DEFINE(_z_owned_sample, z_owned_sample_t, _z_noop_size, _z_noop_clear, _z_noop_copy) _Z_RING_DEFINE(_z_owned_sample, z_owned_sample_t) typedef struct { @@ -106,65 +100,24 @@ typedef struct { #endif } z_owned_sample_ring_t; -static inline void z_sample_channel_ring_push(const z_sample_t *src, void *context) { - if (src == NULL || context == NULL) { - return; - } - - z_owned_sample_ring_t *r = (z_owned_sample_ring_t *)context; - z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); - if (dst == NULL) { - return; - } - *dst = z_sample_to_owned(src); - -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&r->_mutex); -#endif - _z_owned_sample_ring_push_force_drop(&r->_ring, dst); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&r->_mutex); -#endif -} - -static inline int8_t z_sample_channel_ring_pull(z_owned_sample_t *dst, void *context) { - int8_t ret = _Z_RES_OK; +z_owned_sample_channel_t z_sample_channel_ring_new(size_t capacity); +void z_sample_channel_ring_push(const z_sample_t *src, void *context); +int8_t z_sample_channel_ring_pull(z_owned_sample_t *dst, void *context); - z_owned_sample_ring_t *r = (z_owned_sample_ring_t *)context; +// -- Fifo +_Z_FIFO_DEFINE(_z_owned_sample, z_owned_sample_t) +typedef struct { + _z_owned_sample_fifo_t _fifo; #if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_lock(&r->_mutex); -#endif - z_owned_sample_t *src = _z_owned_sample_ring_pull(&r->_ring); -#if Z_FEATURE_MULTI_THREAD == 1 - zp_mutex_unlock(&r->_mutex); + zp_mutex_t _mutex; + zp_condvar_t _cv_not_full; + zp_condvar_t _cv_not_empty; #endif +} z_owned_sample_fifo_t; - if (src == NULL) { - *dst = z_sample_null(); - } else { - memcpy(dst, src, sizeof(z_owned_sample_t)); - } - return ret; -} - -static inline z_owned_sample_channel_t z_sample_channel_ring_new(size_t capacity) { - z_owned_sample_channel_t ch = z_owned_sample_channel_null(); - - z_owned_sample_ring_t *ring = (z_owned_sample_ring_t *)zp_malloc(sizeof(z_owned_sample_ring_t)); - if (ring != NULL) { - int8_t res = _z_owned_sample_ring_init(&ring->_ring, capacity); - if (res == _Z_RES_OK) { - z_owned_closure_sample_t send = z_closure(z_sample_channel_ring_push, NULL, ring); - ch.send = send; - z_owned_closure_owned_sample_t recv = z_closure(z_sample_channel_ring_pull, NULL, ring); - ch.recv = recv; - } else { - zp_free(ring); - } - } - - return ch; -} +z_owned_sample_channel_t z_sample_channel_fifo_new(size_t capacity); +void z_sample_channel_fifo_push(const z_sample_t *src, void *context); +int8_t z_sample_channel_fifo_pull(z_owned_sample_t *dst, void *context); -#endif // INCLUDE_ZENOH_PICO_API_UTILS_H \ No newline at end of file +#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H \ No newline at end of file diff --git a/src/api/handlers.c b/src/api/handlers.c new file mode 100644 index 000000000..5850541a6 --- /dev/null +++ b/src/api/handlers.c @@ -0,0 +1,186 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include "zenoh-pico/api/handlers.h" + +#include "zenoh-pico/api/macros.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/system/platform.h" + +// -- Channel +z_owned_sample_channel_t z_owned_sample_channel_null() { + z_owned_sample_channel_t ch = {.send = NULL, .recv = NULL}; + return ch; +} + +int8_t z_closure_owned_sample_call(z_owned_closure_owned_sample_t *recv, z_owned_sample_t *dst) { + int8_t res = _Z_ERR_GENERIC; + if (recv != NULL) { + res = (recv->call)(dst, recv->context); + } + return res; +} + +// -- Ring +void z_sample_channel_ring_push(const z_sample_t *src, void *context) { + if (src == NULL || context == NULL) { + return; + } + + z_owned_sample_ring_t *r = (z_owned_sample_ring_t *)context; + z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); + if (dst == NULL) { + return; + } + *dst = z_sample_to_owned(src); + +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_lock(&r->_mutex); +#endif + _z_owned_sample_ring_push_force_drop(&r->_ring, dst); +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_unlock(&r->_mutex); +#endif +} + +int8_t z_sample_channel_ring_pull(z_owned_sample_t *dst, void *context) { + int8_t ret = _Z_RES_OK; + + z_owned_sample_ring_t *r = (z_owned_sample_ring_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_lock(&r->_mutex); +#endif + z_owned_sample_t *src = _z_owned_sample_ring_pull(&r->_ring); +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_unlock(&r->_mutex); +#endif + + if (src == NULL) { + *dst = z_sample_null(); + } else { + memcpy(dst, src, sizeof(z_owned_sample_t)); + } + return ret; +} + +z_owned_sample_channel_t z_sample_channel_ring_new(size_t capacity) { + z_owned_sample_channel_t ch = z_owned_sample_channel_null(); + + z_owned_sample_ring_t *ring = (z_owned_sample_ring_t *)zp_malloc(sizeof(z_owned_sample_ring_t)); + if (ring != NULL) { + int8_t res = _z_owned_sample_ring_init(&ring->_ring, capacity); +#if Z_FEATURE_MULTI_THREAD == 1 + res = zp_mutex_init(&ring->_mutex); +#endif + if (res == _Z_RES_OK) { + z_owned_closure_sample_t send = z_closure(z_sample_channel_ring_push, NULL, ring); + ch.send = send; + z_owned_closure_owned_sample_t recv = z_closure(z_sample_channel_ring_pull, NULL, ring); + ch.recv = recv; + } else { + zp_free(ring); + } + } + + return ch; +} + +// -- Fifo +void z_sample_channel_fifo_push(const z_sample_t *src, void *context) { + if (src == NULL || context == NULL) { + return; + } + + z_owned_sample_fifo_t *f = (z_owned_sample_fifo_t *)context; + z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); + if (dst == NULL) { + return; + } + *dst = z_sample_to_owned(src); + +#if Z_FEATURE_MULTI_THREAD == 1 + + zp_mutex_lock(&f->_mutex); + while (dst != NULL) { + dst = _z_owned_sample_fifo_push(&f->_fifo, dst); + if (dst != NULL) { + zp_condvar_wait(&f->_cv_not_full, &f->_mutex); + } else { + zp_condvar_signal(&f->_cv_not_empty); + } + } + zp_mutex_unlock(&f->_mutex); + +#elif // Z_FEATURE_MULTI_THREAD == 1 + + _z_owned_sample_fifo_push_drop(&f->_fifo, dst); + +#endif // Z_FEATURE_MULTI_THREAD == 1 +} + +int8_t z_sample_channel_fifo_pull(z_owned_sample_t *dst, void *context) { + int8_t ret = _Z_RES_OK; + + z_owned_sample_fifo_t *f = (z_owned_sample_fifo_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + + z_owned_sample_t *src = NULL; + zp_mutex_lock(&f->_mutex); + while (src == NULL) { + src = _z_owned_sample_fifo_pull(&f->_fifo); + if (src == NULL) { + zp_condvar_wait(&f->_cv_not_empty, &f->_mutex); + } else { + zp_condvar_signal(&f->_cv_not_full); + } + } + zp_mutex_unlock(&f->_mutex); + memcpy(dst, src, sizeof(z_owned_sample_t)); + +#elif // Z_FEATURE_MULTI_THREAD == 1 + + z_owned_sample_t *src = _z_owned_sample_fifo_pull(&f->_fifo); + if (src != NULL) { + memcpy(dst, src, sizeof(z_owned_sample_t)); + } + +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return ret; +} + +z_owned_sample_channel_t z_sample_channel_fifo_new(size_t capacity) { + z_owned_sample_channel_t ch = z_owned_sample_channel_null(); + + z_owned_sample_fifo_t *fifo = (z_owned_sample_fifo_t *)zp_malloc(sizeof(z_owned_sample_fifo_t)); + if (fifo != NULL) { + int8_t res = _z_owned_sample_fifo_init(&fifo->_fifo, capacity); +#if Z_FEATURE_MULTI_THREAD == 1 + res = zp_mutex_init(&fifo->_mutex); + res = zp_condvar_init(&fifo->_cv_not_full); + res = zp_condvar_init(&fifo->_cv_not_empty); +#endif + if (res == _Z_RES_OK) { + z_owned_closure_sample_t send = z_closure(z_sample_channel_fifo_push, NULL, fifo); + ch.send = send; + z_owned_closure_owned_sample_t recv = z_closure(z_sample_channel_fifo_pull, NULL, fifo); + ch.recv = recv; + } else { + zp_free(fifo); + } + } + + return ch; +} \ No newline at end of file diff --git a/src/api/utils.c b/src/api/utils.c deleted file mode 100644 index d4c49bf7b..000000000 --- a/src/api/utils.c +++ /dev/null @@ -1,64 +0,0 @@ -// -// Copyright (c) 2022 ZettaScale Technology -// -// This program and the accompanying materials are made available under the -// terms of the Eclipse Public License 2.0 which is available at -// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 -// which is available at https://www.apache.org/licenses/LICENSE-2.0. -// -// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 -// -// Contributors: -// ZettaScale Zenoh Team, -// -#include "zenoh-pico/api/utils.h" - -#include "zenoh-pico/api/macros.h" -#include "zenoh-pico/protocol/core.h" -#include "zenoh-pico/system/platform.h" - -// z_owned_sample_ring_t z_sample_channel_ring_new(size_t capacity) { -// z_owned_sample_ring_t ring; -// ring._ring = _z_sample_ring_new(capacity); - -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_init(&ring._mutex); -// #endif - -// return ring; -// } - -// void z_owned_sample_ring_drop(z_owned_sample_ring_t *r) { -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_lock(&r->_mutex); -// #endif -// _z_owned_sample_ring_clear(&r->_ring); -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_unlock(&r->_mutex); -// zp_mutex_free(&r->_mutex); -// #endif -// } - -// void z_sample_channel_ring_push(z_owned_sample_ring_t *r, const z_sample_t *s) { -// z_owned_sample_t *os = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); -// memcpy(&os->keyexpr, &s->keyexpr, sizeof(_z_keyexpr_t)); -// memcpy(&os->payload, &s->payload, sizeof(z_bytes_t)); -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_lock(&r->_mutex); -// #endif -// _z_owned_sample_ring_push_force_drop(&r->_ring, os); -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_unlock(&r->_mutex); -// #endif -// } - -// z_owned_sample_t *z_sample_channel_ring_pull(z_owned_sample_ring_t *r) { -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_lock(&r->_mutex); -// #endif -// z_owned_sample_t *ret = _z_owned_sample_ring_pull(&r->_ring); -// #if Z_FEATURE_MULTI_THREAD == 1 -// zp_mutex_unlock(&r->_mutex); -// #endif -// return ret; -// } diff --git a/zenohpico.pc b/zenohpico.pc index 0f777182f..cc0cb80b6 100644 --- a/zenohpico.pc +++ b/zenohpico.pc @@ -3,6 +3,6 @@ prefix=/usr/local Name: zenohpico Description: URL: -Version: 0.11.20240321dev +Version: 0.11.20240322dev Cflags: -I${prefix}/include Libs: -L${prefix}/lib -lzenohpico