diff --git a/CMakeLists.txt b/CMakeLists.txt index 3df82dd66..704c4b4e8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -331,6 +331,8 @@ if(UNIX OR MSVC) set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/tests") add_executable(z_data_struct_test ${PROJECT_SOURCE_DIR}/tests/z_data_struct_test.c) + add_executable(z_channels_test ${PROJECT_SOURCE_DIR}/tests/z_channels_test.c) + add_executable(z_collections_test ${PROJECT_SOURCE_DIR}/tests/z_collections_test.c) add_executable(z_endpoint_test ${PROJECT_SOURCE_DIR}/tests/z_endpoint_test.c) add_executable(z_iobuf_test ${PROJECT_SOURCE_DIR}/tests/z_iobuf_test.c) add_executable(z_msgcodec_test ${PROJECT_SOURCE_DIR}/tests/z_msgcodec_test.c) @@ -343,6 +345,8 @@ if(UNIX OR MSVC) add_executable(z_perf_rx ${PROJECT_SOURCE_DIR}/tests/z_perf_rx.c) target_link_libraries(z_data_struct_test ${Libname}) + target_link_libraries(z_channels_test ${Libname}) + target_link_libraries(z_collections_test ${Libname}) target_link_libraries(z_endpoint_test ${Libname}) target_link_libraries(z_iobuf_test ${Libname}) target_link_libraries(z_msgcodec_test ${Libname}) @@ -361,6 +365,8 @@ if(UNIX OR MSVC) enable_testing() add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test) + add_test(z_channels_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_channels_test) + add_test(z_collections_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_collections_test) add_test(z_endpoint_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_endpoint_test) add_test(z_iobuf_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_iobuf_test) add_test(z_msgcodec_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_msgcodec_test) @@ -475,4 +481,4 @@ if(PACKAGING) include(CPack) endif() -endif() \ No newline at end of file +endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 05e76b29b..1924dad9e 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -37,6 +37,7 @@ if(UNIX) add_example(z_pub unix/c11/z_pub.c) add_example(z_pub_st unix/c11/z_pub_st.c) add_example(z_sub unix/c11/z_sub.c) + add_example(z_sub_channel unix/c11/z_sub_channel.c) add_example(z_sub_st unix/c11/z_sub_st.c) add_example(z_pull unix/c11/z_pull.c) add_example(z_get unix/c11/z_get.c) diff --git a/examples/unix/c11/z_pull.c b/examples/unix/c11/z_pull.c index a52a23b0d..c6bb58f20 100644 --- a/examples/unix/c11/z_pull.c +++ b/examples/unix/c11/z_pull.c @@ -19,21 +19,14 @@ #include #if Z_FEATURE_SUBSCRIPTION == 1 -// @TODO -// void data_handler(const z_sample_t *sample, void *ctx) { -// (void)(ctx); -// z_owned_str_t keystr = z_keyexpr_to_string(sample->keyexpr); -// printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample->payload.len, -// sample->payload.start); -// z_drop(z_move(keystr)); -// } - int main(int argc, char **argv) { const char *keyexpr = "demo/example/**"; char *locator = NULL; + size_t interval = 5000; + size_t size = 3; int opt; - while ((opt = getopt(argc, argv, "k:e:")) != -1) { + while ((opt = getopt(argc, argv, "k:e:i:s:")) != -1) { switch (opt) { case 'k': keyexpr = optarg; @@ -41,8 +34,14 @@ int main(int argc, char **argv) { case 'e': locator = optarg; break; + case 'i': + interval = (size_t)atoi(optarg); + break; + case 's': + size = (size_t)atoi(optarg); + break; case '?': - if (optopt == 'k' || optopt == 'e') { + if (optopt == 'k' || optopt == 'e' || optopt == 'i' || optopt == 's') { fprintf(stderr, "Option -%c requires an argument.\n", optopt); } else { fprintf(stderr, "Unknown option `-%c'.\n", optopt); @@ -72,31 +71,30 @@ int main(int argc, char **argv) { return -1; } - // @TODO - // z_owned_closure_sample_t callback = z_closure(data_handler); printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(size); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } - // @TODO - // z_owned_pull_subscriber_t sub = z_declare_pull_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL); - // if (!z_check(sub)) { - // printf("Unable to declare subscriber.\n"); - // return -1; - // } - - // printf("Enter any key to pull data or 'q' to quit...\n"); - // char c = '\0'; - // while (1) { - // fflush(stdin); - // int ret = scanf("%c", &c); - // (void)ret; // Remove unused result warning - // if (c == 'q') { - // break; - // } - // z_subscriber_pull(z_loan(sub)); - // } + printf("Pulling data every %zu ms... Ring size: %zd\n", interval, size); + z_owned_sample_t sample = z_sample_null(); + while (true) { + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr); + printf(">> [Subscriber] Pulled ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len, + z_loan(sample).payload.start); + z_drop(z_move(keystr)); + z_drop(z_move(sample)); + } + printf(">> [Subscriber] Nothing to pull... sleep for %zu ms\n", interval); + zp_sleep_ms(interval); + } - // z_undeclare_pull_subscriber(z_move(sub)); - printf("Pull Subscriber not supported... exiting\n"); + z_undeclare_subscriber(z_move(sub)); + z_drop(z_move(channel)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/examples/unix/c11/z_sub_channel.c b/examples/unix/c11/z_sub_channel.c new file mode 100644 index 000000000..43548d1ba --- /dev/null +++ b/examples/unix/c11/z_sub_channel.c @@ -0,0 +1,100 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, + +#include +#include +#include +#include +#include +#include + +#if Z_FEATURE_SUBSCRIPTION == 1 +int main(int argc, char **argv) { + const char *keyexpr = "demo/example/**"; + char *locator = NULL; + + int opt; + while ((opt = getopt(argc, argv, "k:e:")) != -1) { + switch (opt) { + case 'k': + keyexpr = optarg; + break; + case 'e': + locator = optarg; + break; + case '?': + if (optopt == 'k' || optopt == 'e') { + fprintf(stderr, "Option -%c requires an argument.\n", optopt); + } else { + fprintf(stderr, "Unknown option `-%c'.\n", optopt); + } + return 1; + default: + return -1; + } + } + + z_owned_config_t config = z_config_default(); + if (locator != NULL) { + zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(locator)); + } + + printf("Opening session...\n"); + z_owned_session_t s = z_open(z_move(config)); + if (!z_check(s)) { + printf("Unable to open session!\n"); + return -1; + } + + // Start read and lease tasks for zenoh-pico + if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) { + printf("Unable to start read and lease tasks\n"); + z_close(z_session_move(&s)); + return -1; + } + + printf("Declaring Subscriber on '%s'...\n", keyexpr); + z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(3); + z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL); + if (!z_check(sub)) { + printf("Unable to declare subscriber.\n"); + return -1; + } + + z_owned_sample_t sample = z_sample_null(); + for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) { + z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample).keyexpr); + printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)z_loan(sample).payload.len, + z_loan(sample).payload.start); + z_drop(z_move(keystr)); + z_drop(z_move(sample)); + sample = z_sample_null(); + } + + z_undeclare_subscriber(z_move(sub)); + z_drop(z_move(channel)); + + // Stop read and lease tasks for zenoh-pico + zp_stop_read_task(z_loan(s)); + zp_stop_lease_task(z_loan(s)); + + z_close(z_move(s)); + + return 0; +} +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n"); + return -2; +} +#endif diff --git a/include/zenoh-pico.h b/include/zenoh-pico.h index f1ea60301..4f9ebb636 100644 --- a/include/zenoh-pico.h +++ b/include/zenoh-pico.h @@ -17,11 +17,12 @@ #define ZENOH_PICO "0.11.0.0" #define ZENOH_PICO_MAJOR 0 -#define ZENOH_PICO_MINOR 10 +#define ZENOH_PICO_MINOR 11 #define ZENOH_PICO_PATCH 0 #define ZENOH_PICO_TWEAK 0 #include "zenoh-pico/api/constants.h" +#include "zenoh-pico/api/handlers.h" #include "zenoh-pico/api/macros.h" #include "zenoh-pico/api/primitives.h" #include "zenoh-pico/api/types.h" diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h new file mode 100644 index 000000000..d48c2e8fc --- /dev/null +++ b/include/zenoh-pico/api/handlers.h @@ -0,0 +1,88 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef INCLUDE_ZENOH_PICO_API_HANDLERS_H +#define INCLUDE_ZENOH_PICO_API_HANDLERS_H + +#include + +#include "zenoh-pico/api/macros.h" +#include "zenoh-pico/api/types.h" +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/fifo_mt.h" +#include "zenoh-pico/collections/ring_mt.h" +#include "zenoh-pico/utils/logging.h" + +// -- Samples handler +void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src); +z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); + +// -- Channel +#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \ + collection_new_f, collection_free_f, collection_push_f, collection_pull_f, elem_move_f, \ + elem_convert_f, elem_free_f) \ + typedef struct { \ + z_owned_##send_closure_name##_t send; \ + z_owned_##recv_closure_name##_t recv; \ + collection_type *collection; \ + } z_owned_##name##_t; \ + \ + static inline void _z_##name##_elem_free(void **elem) { \ + elem_free_f((recv_type *)*elem); \ + *elem = NULL; \ + } \ + static inline void _z_##name##_elem_move(void *dst, const void *src) { \ + elem_move_f((recv_type *)dst, (const recv_type *)src); \ + } \ + static inline void _z_##name##_send(const send_type *elem, void *context) { \ + void *internal_elem = elem_convert_f(elem); \ + if (internal_elem == NULL) { \ + return; \ + } \ + int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ + } \ + } \ + static inline void _z_##name##_recv(recv_type *elem, void *context) { \ + int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_pull_f, ret); \ + } \ + } \ + \ + static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \ + z_owned_##name##_t channel; \ + channel.collection = collection_new_f(capacity); \ + channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \ + channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \ + return channel; \ + } \ + static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \ + static inline void z_##name##_drop(z_owned_##name##_t *channel) { \ + collection_free_f(channel->collection, _z_##name##_elem_free); \ + z_##send_closure_name##_drop(&channel->send); \ + z_##recv_closure_name##_drop(&channel->recv); \ + } + +// z_owned_sample_ring_channel_t +_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t, + _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_owned_sample_move, + _z_sample_to_owned_ptr, z_sample_drop) + +// z_owned_sample_fifo_channel_t +_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t, + _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_owned_sample_move, + _z_sample_to_owned_ptr, z_sample_drop) + +#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index 0a99bf736..e64f1f84a 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -42,7 +42,8 @@ z_owned_reply_t : z_reply_loan, \ z_owned_hello_t : z_hello_loan, \ z_owned_str_t : z_str_loan, \ - z_owned_str_array_t : z_str_array_loan \ + z_owned_str_array_t : z_str_array_loan, \ + z_owned_sample_t : z_sample_loan \ )(&x) /** * Defines a generic function for dropping any of the ``z_owned_X_t`` types. @@ -62,36 +63,17 @@ z_owned_hello_t * : z_hello_drop, \ z_owned_str_t * : z_str_drop, \ z_owned_str_array_t * : z_str_array_drop, \ + z_owned_sample_t * : z_sample_drop, \ z_owned_closure_sample_t * : z_closure_sample_drop, \ + z_owned_closure_owned_sample_t * : z_closure_owned_sample_drop, \ z_owned_closure_query_t * : z_closure_query_drop, \ z_owned_closure_reply_t * : z_closure_reply_drop, \ z_owned_closure_hello_t * : z_closure_hello_drop, \ - z_owned_closure_zid_t * : z_closure_zid_drop \ + z_owned_closure_zid_t * : z_closure_zid_drop, \ + z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop \ )(x) -/** - * Defines a generic function for making null object of any of the ``z_owned_X_t`` types. - * - * Returns: - * Returns the uninitialized instance of `x`. - */ -#define z_null(x) (*x = _Generic((x), \ - z_owned_session_t * : z_session_null, \ - z_owned_publisher_t * : z_publisher_null, \ - z_owned_keyexpr_t * : z_keyexpr_null, \ - z_owned_config_t * : z_config_null, \ - z_owned_scouting_config_t * : z_scouting_config_null, \ - z_owned_subscriber_t * : z_subscriber_null, \ - z_owned_queryable_t * : z_queryable_null, \ - z_owned_reply_t * : z_reply_null, \ - z_owned_hello_t * : z_hello_null, \ - z_owned_str_t * : z_str_null, \ - z_owned_closure_sample_t * : z_closure_sample_null, \ - z_owned_closure_query_t * : z_closure_query_null, \ - z_owned_closure_reply_t * : z_closure_reply_null, \ - z_owned_closure_hello_t * : z_closure_hello_null, \ - z_owned_closure_zid_t * : z_closure_zid_null \ - )()) /** * Defines a generic function for checking the validity of any of the ``z_owned_X_t`` types. * @@ -116,7 +98,8 @@ z_owned_hello_t : z_hello_check, \ z_owned_str_t : z_str_check, \ z_owned_str_array_t : z_str_array_check, \ - z_bytes_t : z_bytes_check \ + z_bytes_t : z_bytes_check, \ + z_owned_sample_t : z_sample_check \ )(&x) /** @@ -126,11 +109,12 @@ * x: The closure to call */ #define z_call(x, ...) \ - _Generic((x), z_owned_closure_sample_t : z_closure_sample_call, \ - z_owned_closure_query_t : z_closure_query_call, \ - z_owned_closure_reply_t : z_closure_reply_call, \ - z_owned_closure_hello_t : z_closure_hello_call, \ - z_owned_closure_zid_t : z_closure_zid_call \ + _Generic((x), z_owned_closure_sample_t : z_closure_sample_call, \ + z_owned_closure_query_t : z_closure_query_call, \ + z_owned_closure_reply_t : z_closure_reply_call, \ + z_owned_closure_hello_t : z_closure_hello_call, \ + z_owned_closure_zid_t : z_closure_zid_call, \ + z_owned_closure_owned_sample_t : z_closure_owned_sample_call \ ) (&x, __VA_ARGS__) /** @@ -143,22 +127,26 @@ * Returns the instance associated with `x`. */ #define z_move(x) _Generic((x), \ - z_owned_keyexpr_t : z_keyexpr_move, \ - z_owned_config_t : z_config_move, \ - z_owned_scouting_config_t : z_scouting_config_move, \ - z_owned_session_t : z_session_move, \ - z_owned_subscriber_t : z_subscriber_move, \ - z_owned_publisher_t : z_publisher_move, \ - z_owned_queryable_t : z_queryable_move, \ - z_owned_reply_t : z_reply_move, \ - z_owned_hello_t : z_hello_move, \ - z_owned_str_t : z_str_move, \ - z_owned_str_array_t : z_str_array_move, \ - z_owned_closure_sample_t : z_closure_sample_move, \ - z_owned_closure_query_t : z_closure_query_move, \ - z_owned_closure_reply_t : z_closure_reply_move, \ - z_owned_closure_hello_t : z_closure_hello_move, \ - z_owned_closure_zid_t : z_closure_zid_move \ + z_owned_keyexpr_t : z_keyexpr_move, \ + z_owned_config_t : z_config_move, \ + z_owned_scouting_config_t : z_scouting_config_move, \ + z_owned_session_t : z_session_move, \ + z_owned_subscriber_t : z_subscriber_move, \ + z_owned_publisher_t : z_publisher_move, \ + z_owned_queryable_t : z_queryable_move, \ + z_owned_reply_t : z_reply_move, \ + z_owned_hello_t : z_hello_move, \ + z_owned_str_t : z_str_move, \ + z_owned_str_array_t : z_str_array_move, \ + z_owned_closure_sample_t : z_closure_sample_move, \ + z_owned_closure_owned_sample_t : z_closure_owned_sample_move, \ + z_owned_closure_query_t : z_closure_query_move, \ + z_owned_closure_reply_t : z_closure_reply_move, \ + z_owned_closure_hello_t : z_closure_hello_move, \ + z_owned_closure_zid_t : z_closure_zid_move, \ + z_owned_sample_t : z_sample_move, \ + z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \ + z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move \ )(&x) /** @@ -201,10 +189,12 @@ z_owned_hello_t * : z_hello_null, \ z_owned_str_t * : z_str_null, \ z_owned_closure_sample_t * : z_closure_sample_null, \ + z_owned_closure_owned_sample_t * : z_closure_owned_sample_null, \ z_owned_closure_query_t * : z_closure_query_null, \ z_owned_closure_reply_t * : z_closure_reply_null, \ z_owned_closure_hello_t * : z_closure_hello_null, \ - z_owned_closure_zid_t * : z_closure_zid_null \ + z_owned_closure_zid_t * : z_closure_zid_null, \ + z_owned_sample_t * : z_sample_null \ )()) // clang-format on @@ -259,10 +249,13 @@ template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; +template<> struct zenoh_drop_type { typedef void type; }; template<> inline int8_t z_drop(z_owned_session_t* v) { return z_close(v); } template<> inline int8_t z_drop(z_owned_publisher_t* v) { return z_undeclare_publisher(v); } @@ -275,10 +268,13 @@ template<> inline void z_drop(z_owned_reply_t* v) { z_reply_drop(v); } template<> inline void z_drop(z_owned_hello_t* v) { z_hello_drop(v); } template<> inline void z_drop(z_owned_str_t* v) { z_str_drop(v); } template<> inline void z_drop(z_owned_closure_sample_t* v) { z_closure_sample_drop(v); } +template<> inline void z_drop(z_owned_closure_owned_sample_t* v) { z_closure_owned_sample_drop(v); } template<> inline void z_drop(z_owned_closure_query_t* v) { z_closure_query_drop(v); } template<> inline void z_drop(z_owned_closure_reply_t* v) { z_closure_reply_drop(v); } template<> inline void z_drop(z_owned_closure_hello_t* v) { z_closure_hello_drop(v); } template<> inline void z_drop(z_owned_closure_zid_t* v) { z_closure_zid_drop(v); } +template<> inline void z_drop(z_owned_sample_ring_channel_t* v) { z_owned_sample_ring_channel_drop(v); } +template<> inline void z_drop(z_owned_sample_fifo_channel_t* v) { z_owned_sample_fifo_channel_drop(v); } inline void z_null(z_owned_session_t& v) { v = z_session_null(); } inline void z_null(z_owned_publisher_t& v) { v = z_publisher_null(); } @@ -291,6 +287,7 @@ inline void z_null(z_owned_reply_t& v) { v = z_reply_null(); } inline void z_null(z_owned_hello_t& v) { v = z_hello_null(); } inline void z_null(z_owned_str_t& v) { v = z_str_null(); } inline void z_null(z_owned_closure_sample_t& v) { v = z_closure_sample_null(); } +inline void z_null(z_owned_clusure_owned_sample_t& v) { v = z_closure_owned_sample_null(); } inline void z_null(z_owned_closure_query_t& v) { v = z_closure_query_null(); } inline void z_null(z_owned_closure_reply_t& v) { v = z_closure_reply_null(); } inline void z_null(z_owned_closure_hello_t& v) { v = z_closure_hello_null(); } @@ -308,9 +305,12 @@ inline bool z_check(const z_owned_queryable_t& v) { return z_queryable_check(&v) inline bool z_check(const z_owned_reply_t& v) { return z_reply_check(&v); } inline bool z_check(const z_owned_hello_t& v) { return z_hello_check(&v); } inline bool z_check(const z_owned_str_t& v) { return z_str_check(&v); } +inline bool z_check(const z_owned_str_t& v) { return z_sample_check(&v); } inline void z_call(const z_owned_closure_sample_t &closure, const z_sample_t *sample) { z_closure_sample_call(&closure, sample); } +inline void z_call(const z_owned_closure_owned_sample_t &closure, const z_sample_t *sample) + { z_closure_owned_sample_call(&closure, sample); } inline void z_call(const z_owned_closure_query_t &closure, const z_query_t *query) { z_closure_query_call(&closure, query); } inline void z_call(const z_owned_closure_reply_t &closure, z_owned_reply_t *sample) diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index a72878b99..022e1c729 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -564,6 +564,30 @@ z_keyexpr_t z_query_keyexpr(const z_query_t *query); */ z_owned_closure_sample_t z_closure_sample(_z_data_handler_t call, _z_dropper_handler_t drop, void *context); +/** + * Return a new sample closure. + * It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said + * instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code, + * consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but + * will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val`` + * is valid. + * + * To check if ``val`` is still valid, you may use ``z_closure_owned_sample_check(&val)`` or ``z_check(val)`` if your + * compiler supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise. + * + * Parameters: + * call: the typical callback function. ``context`` will be passed as its last argument. + * drop: allows the callback's state to be freed. ``context`` will be passed as its last argument. + * context: a pointer to an arbitrary state. + * + * Returns: + * Returns a new sample closure. + */ +z_owned_closure_owned_sample_t z_closure_owned_sample(_z_owned_sample_handler_t call, _z_dropper_handler_t drop, + void *context); + /** * Return a new query closure. * It consists on a structure that contains all the elements for stateful, memory-leak-free callbacks. @@ -692,6 +716,7 @@ _OWNED_FUNCTIONS(z_queryable_t, z_owned_queryable_t, queryable) _OWNED_FUNCTIONS(z_hello_t, z_owned_hello_t, hello) _OWNED_FUNCTIONS(z_reply_t, z_owned_reply_t, reply) _OWNED_FUNCTIONS(z_str_array_t, z_owned_str_array_t, str_array) +_OWNED_FUNCTIONS(z_sample_t, z_owned_sample_t, sample) #define _OWNED_FUNCTIONS_CLOSURE(ownedtype, name) \ _Bool z_##name##_check(const ownedtype *val); \ @@ -701,6 +726,7 @@ _OWNED_FUNCTIONS(z_str_array_t, z_owned_str_array_t, str_array) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_sample_t, closure_sample) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_query_t, closure_query) +_OWNED_FUNCTIONS_CLOSURE(z_owned_closure_owned_sample_t, closure_owned_sample) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_reply_t, closure_reply) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_hello_t, closure_hello) _OWNED_FUNCTIONS_CLOSURE(z_owned_closure_zid_t, closure_zid) diff --git a/include/zenoh-pico/api/types.h b/include/zenoh-pico/api/types.h index 6932536e4..4a39ccaec 100644 --- a/include/zenoh-pico/api/types.h +++ b/include/zenoh-pico/api/types.h @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // Błażej Sowa, - +// #ifndef INCLUDE_ZENOH_PICO_API_TYPES_H #define INCLUDE_ZENOH_PICO_API_TYPES_H @@ -401,6 +401,7 @@ typedef struct { * z_timestamp_t timestamp: The timestamp of this data sample. */ typedef _z_sample_t z_sample_t; +_OWNED_TYPE_PTR(z_sample_t, sample) /** * Represents the content of a `hello` message returned by a zenoh entity as a reply to a `scout` message. @@ -448,6 +449,7 @@ _Bool z_str_array_is_empty(const z_str_array_t *a); _OWNED_TYPE_PTR(z_str_array_t, str_array) typedef void (*_z_dropper_handler_t)(void *arg); +typedef void (*_z_owned_sample_handler_t)(z_owned_sample_t *sample, void *arg); /** * Represents the sample closure. @@ -467,6 +469,25 @@ typedef struct { void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_sample_t *sample); +/** + * Represents the owned sample closure. + * + * A closure is a structure that contains all the elements for stateful, memory-leak-free callbacks. + * + * Members: + * _z_owned_sample_handler_t call: `void *call(const struct z_owned_sample_t*, const void *context)` is the callback + * function. + * _z_dropper_handler_t drop: `void *drop(void*)` allows the callback's state to be freed. void *context: a + * pointer to an arbitrary state. + */ +typedef struct { + void *context; + _z_owned_sample_handler_t call; + _z_dropper_handler_t drop; +} z_owned_closure_owned_sample_t; + +void z_closure_owned_sample_call(const z_owned_closure_owned_sample_t *closure, z_owned_sample_t *sample); + /** * Represents the query callback closure. * diff --git a/include/zenoh-pico/collections/element.h b/include/zenoh-pico/collections/element.h index 5e8997954..39583abe4 100644 --- a/include/zenoh-pico/collections/element.h +++ b/include/zenoh-pico/collections/element.h @@ -25,6 +25,7 @@ typedef size_t (*z_element_size_f)(void *e); typedef void (*z_element_clear_f)(void *e); typedef void (*z_element_free_f)(void **e); typedef void (*z_element_copy_f)(void *dst, const void *src); +typedef void (*z_element_move_f)(void *dst, const void *src); typedef void *(*z_element_clone_f)(const void *e); typedef _Bool (*z_element_eq_f)(const void *left, const void *right); diff --git a/include/zenoh-pico/collections/fifo.h b/include/zenoh-pico/collections/fifo.h new file mode 100644 index 000000000..23562e460 --- /dev/null +++ b/include/zenoh-pico/collections/fifo.h @@ -0,0 +1,63 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_FIFO_H +#define ZENOH_PICO_COLLECTIONS_FIFO_H + +#include +#include + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/ring.h" + +/*-------- Fifo Buffer --------*/ +typedef struct { + _z_ring_t _ring; +} _z_fifo_t; + +int8_t _z_fifo_init(_z_fifo_t *fifo, size_t capacity); +_z_fifo_t _z_fifo_make(size_t capacity); + +size_t _z_fifo_capacity(const _z_fifo_t *r); +size_t _z_fifo_len(const _z_fifo_t *r); +_Bool _z_fifo_is_empty(const _z_fifo_t *r); +_Bool _z_fifo_is_full(const _z_fifo_t *r); + +void *_z_fifo_push(_z_fifo_t *r, void *e); +void _z_fifo_push_drop(_z_fifo_t *r, void *e, z_element_free_f f); +void *_z_fifo_pull(_z_fifo_t *r); + +_z_fifo_t *_z_fifo_clone(const _z_fifo_t *xs, z_element_clone_f d_f); + +void _z_fifo_clear(_z_fifo_t *v, z_element_free_f f); +void _z_fifo_free(_z_fifo_t **xs, z_element_free_f f_f); + +#define _Z_FIFO_DEFINE(name, type) \ + typedef _z_fifo_t name##_fifo_t; \ + static inline int8_t name##_fifo_init(name##_fifo_t *fifo, size_t capacity) { \ + return _z_fifo_init(fifo, capacity); \ + } \ + static inline name##_fifo_t name##_fifo_make(size_t capacity) { return _z_fifo_make(capacity); } \ + static inline size_t name##_fifo_capacity(const name##_fifo_t *r) { return _z_fifo_capacity(r); } \ + static inline size_t name##_fifo_len(const name##_fifo_t *r) { return _z_fifo_len(r); } \ + static inline _Bool name##_fifo_is_empty(const name##_fifo_t *r) { return _z_fifo_is_empty(r); } \ + static inline _Bool name##_fifo_is_full(const name##_fifo_t *r) { return _z_fifo_is_full(r); } \ + static inline type *name##_fifo_push(name##_fifo_t *r, type *e) { return _z_fifo_push(r, (void *)e); } \ + static inline void name##_fifo_push_drop(name##_fifo_t *r, type *e) { \ + _z_fifo_push_drop(r, (void *)e, name##_elem_free); \ + } \ + static inline type *name##_fifo_pull(name##_fifo_t *r) { return (type *)_z_fifo_pull(r); } \ + static inline void name##_fifo_clear(name##_fifo_t *r) { _z_fifo_clear(r, name##_elem_free); } \ + static inline void name##_fifo_free(name##_fifo_t **r) { _z_fifo_free(r, name##_elem_free); } + +#endif /* ZENOH_PICO_COLLECTIONS_FIFO_H */ diff --git a/include/zenoh-pico/collections/fifo_mt.h b/include/zenoh-pico/collections/fifo_mt.h new file mode 100644 index 000000000..bcf8d700c --- /dev/null +++ b/include/zenoh-pico/collections/fifo_mt.h @@ -0,0 +1,43 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_FIFO_MT_H +#define ZENOH_PICO_COLLECTIONS_FIFO_MT_H + +#include + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/fifo.h" +#include "zenoh-pico/system/platform.h" + +/*-------- Fifo Buffer Multithreaded --------*/ +typedef struct { + _z_fifo_t _fifo; +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_t _mutex; + zp_condvar_t _cv_not_full; + zp_condvar_t _cv_not_empty; +#endif +} _z_fifo_mt_t; + +int8_t _z_fifo_mti_init(size_t capacity); +_z_fifo_mt_t *_z_fifo_mt_new(size_t capacity); + +void _z_fifo_mt_clear(_z_fifo_mt_t *fifo, z_element_free_f free_f); +void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f); + +int8_t _z_fifo_mt_push(const void *src, void *context, z_element_free_f element_free); + +int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move); + +#endif // ZENOH_PICO_COLLECTIONS_FIFO_MT_H diff --git a/include/zenoh-pico/collections/lifo.h b/include/zenoh-pico/collections/lifo.h new file mode 100644 index 000000000..caba25d18 --- /dev/null +++ b/include/zenoh-pico/collections/lifo.h @@ -0,0 +1,64 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_LIFO_H +#define ZENOH_PICO_COLLECTIONS_LIFO_H + +#include +#include + +#include "zenoh-pico/collections/element.h" + +/*-------- Ring Buffer --------*/ +typedef struct { + void **_val; + size_t _capacity; + size_t _len; +} _z_lifo_t; + +int8_t _z_lifo_init(_z_lifo_t *lifo, size_t capacity); +_z_lifo_t _z_lifo_make(size_t capacity); + +size_t _z_lifo_capacity(const _z_lifo_t *r); +size_t _z_lifo_len(const _z_lifo_t *r); +_Bool _z_lifo_is_empty(const _z_lifo_t *r); +_Bool _z_lifo_is_full(const _z_lifo_t *r); + +void *_z_lifo_push(_z_lifo_t *r, void *e); +void _z_lifo_push_drop(_z_lifo_t *r, void *e, z_element_free_f f); +void *_z_lifo_pull(_z_lifo_t *r); + +_z_lifo_t *_z_lifo_clone(const _z_lifo_t *xs, z_element_clone_f d_f); + +void _z_lifo_clear(_z_lifo_t *v, z_element_free_f f); +void _z_lifo_free(_z_lifo_t **xs, z_element_free_f f_f); + +#define _Z_LIFO_DEFINE(name, type) \ + typedef _z_lifo_t name##_lifo_t; \ + static inline int8_t name##_lifo_init(name##_lifo_t *lifo, size_t capacity) { \ + return _z_lifo_init(lifo, capacity); \ + } \ + static inline name##_lifo_t name##_lifo_make(size_t capacity) { return _z_lifo_make(capacity); } \ + static inline size_t name##_lifo_capacity(const name##_lifo_t *r) { return _z_lifo_capacity(r); } \ + static inline size_t name##_lifo_len(const name##_lifo_t *r) { return _z_lifo_len(r); } \ + static inline _Bool name##_lifo_is_empty(const name##_lifo_t *r) { return _z_lifo_is_empty(r); } \ + static inline _Bool name##_lifo_is_full(const name##_lifo_t *r) { return _z_lifo_is_full(r); } \ + static inline type *name##_lifo_push(name##_lifo_t *r, type *e) { return _z_lifo_push(r, (void *)e); } \ + static inline void name##_lifo_push_drop(name##_lifo_t *r, type *e) { \ + _z_lifo_push_drop(r, (void *)e, name##_elem_free); \ + } \ + static inline type *name##_lifo_pull(name##_lifo_t *r) { return (type *)_z_lifo_pull(r); } \ + static inline void name##_lifo_clear(name##_lifo_t *r) { _z_lifo_clear(r, name##_elem_free); } \ + static inline void name##_lifo_free(name##_lifo_t **r) { _z_lifo_free(r, name##_elem_free); } + +#endif /* ZENOH_PICO_COLLECTIONS_LIFO_H */ diff --git a/include/zenoh-pico/collections/ring.h b/include/zenoh-pico/collections/ring.h new file mode 100644 index 000000000..3cdcc057c --- /dev/null +++ b/include/zenoh-pico/collections/ring.h @@ -0,0 +1,68 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_RING_H +#define ZENOH_PICO_COLLECTIONS_RING_H + +#include +#include + +#include "zenoh-pico/collections/element.h" + +/*-------- Ring Buffer --------*/ +typedef struct { + void **_val; + size_t _capacity; + size_t _len; + size_t _r_idx; + size_t _w_idx; +} _z_ring_t; + +int8_t _z_ring_init(_z_ring_t *ring, size_t capacity); +_z_ring_t _z_ring_make(size_t capacity); + +size_t _z_ring_capacity(const _z_ring_t *r); +size_t _z_ring_len(const _z_ring_t *r); +bool _z_ring_is_empty(const _z_ring_t *r); +bool _z_ring_is_full(const _z_ring_t *r); + +void *_z_ring_push(_z_ring_t *r, void *e); +void *_z_ring_push_force(_z_ring_t *r, void *e); +void _z_ring_push_force_drop(_z_ring_t *r, void *e, z_element_free_f f); +void *_z_ring_pull(_z_ring_t *r); + +_z_ring_t *_z_ring_clone(const _z_ring_t *xs, z_element_clone_f d_f); + +void _z_ring_clear(_z_ring_t *v, z_element_free_f f); +void _z_ring_free(_z_ring_t **xs, z_element_free_f f_f); + +#define _Z_RING_DEFINE(name, type) \ + typedef _z_ring_t name##_ring_t; \ + static inline int8_t name##_ring_init(name##_ring_t *ring, size_t capacity) { \ + return _z_ring_init(ring, capacity); \ + } \ + static inline name##_ring_t name##_ring_make(size_t capacity) { return _z_ring_make(capacity); } \ + static inline size_t name##_ring_capacity(const name##_ring_t *r) { return _z_ring_capacity(r); } \ + static inline size_t name##_ring_len(const name##_ring_t *r) { return _z_ring_len(r); } \ + static inline bool name##_ring_is_empty(const name##_ring_t *r) { return _z_ring_is_empty(r); } \ + static inline bool name##_ring_is_full(const name##_ring_t *r) { return _z_ring_is_full(r); } \ + static inline type *name##_ring_push(name##_ring_t *r, type *e) { return _z_ring_push(r, (void *)e); } \ + static inline type *name##_ring_push_force(name##_ring_t *r, type *e) { return _z_ring_push_force(r, (void *)e); } \ + static inline void name##_ring_push_force_drop(name##_ring_t *r, type *e) { \ + _z_ring_push_force_drop(r, (void *)e, name##_elem_free); \ + } \ + static inline type *name##_ring_pull(name##_ring_t *r) { return (type *)_z_ring_pull(r); } \ + static inline void name##_ring_clear(name##_ring_t *r) { _z_ring_clear(r, name##_elem_free); } \ + static inline void name##_ring_free(name##_ring_t **r) { _z_ring_free(r, name##_elem_free); } + +#endif /* ZENOH_PICO_COLLECTIONS_RING_H */ diff --git a/include/zenoh-pico/collections/ring_mt.h b/include/zenoh-pico/collections/ring_mt.h new file mode 100644 index 000000000..98a413e20 --- /dev/null +++ b/include/zenoh-pico/collections/ring_mt.h @@ -0,0 +1,41 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#ifndef ZENOH_PICO_COLLECTIONS_RING_MT_H +#define ZENOH_PICO_COLLECTIONS_RING_MT_H + +#include + +#include "zenoh-pico/collections/element.h" +#include "zenoh-pico/collections/fifo.h" +#include "zenoh-pico/system/platform.h" + +/*-------- Ring Buffer Multithreaded --------*/ +typedef struct { + _z_ring_t _ring; +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_t _mutex; +#endif +} _z_ring_mt_t; + +int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity); +_z_ring_mt_t *_z_ring_mt_new(size_t capacity); + +void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f); +void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f); + +int8_t _z_ring_mt_push(const void *src, void *context, z_element_free_f element_free); + +int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move); + +#endif // ZENOH_PICO_COLLECTIONS_RING_MT_H diff --git a/include/zenoh-pico/collections/string.h b/include/zenoh-pico/collections/string.h index 38fe3d334..cfbc33978 100644 --- a/include/zenoh-pico/collections/string.h +++ b/include/zenoh-pico/collections/string.h @@ -33,7 +33,7 @@ size_t _z_str_size(const char *src); void _z_str_copy(char *dst, const char *src); void _z_str_n_copy(char *dst, const char *src, size_t size); _Z_ELEM_DEFINE(_z_str, char, _z_str_size, _z_noop_clear, _z_str_copy) -// _Z_ARRAY_DEFINE(_z_str, char *) + // This is here for reference on why // the _z_str_array_t was not defined using this macro // but instead manually as find below @@ -73,6 +73,8 @@ typedef struct { } _z_string_t; _z_string_t _z_string_make(const char *value); + +size_t _z_string_size(const _z_string_t *s); void _z_string_copy(_z_string_t *dst, const _z_string_t *src); void _z_string_move(_z_string_t *dst, _z_string_t *src); void _z_string_move_str(_z_string_t *dst, char *src); @@ -81,6 +83,8 @@ void _z_string_free(_z_string_t **s); void _z_string_reset(_z_string_t *s); _z_string_t _z_string_from_bytes(const _z_bytes_t *bs); +_Z_ELEM_DEFINE(_z_string, _z_string_t, _z_string_size, _z_string_clear, _z_string_copy) + /*-------- str_array --------*/ /** * An array of NULL terminated strings. diff --git a/include/zenoh-pico/net/memory.h b/include/zenoh-pico/net/memory.h index 750bcff4d..cf578faf3 100644 --- a/include/zenoh-pico/net/memory.h +++ b/include/zenoh-pico/net/memory.h @@ -26,5 +26,7 @@ void _z_sample_move(_z_sample_t *dst, _z_sample_t *src); void _z_sample_clear(_z_sample_t *sample); void _z_sample_free(_z_sample_t **sample); +void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src); +_z_sample_t _z_sample_duplicate(const _z_sample_t *src); #endif /* ZENOH_PICO_MEMORY_NETAPI_H */ diff --git a/include/zenoh-pico/protocol/core.h b/include/zenoh-pico/protocol/core.h index 113566319..c8a27ecd2 100644 --- a/include/zenoh-pico/protocol/core.h +++ b/include/zenoh-pico/protocol/core.h @@ -231,6 +231,9 @@ typedef struct { z_attachment_t attachment; #endif } _z_sample_t; +static inline bool _z_sample_check(const _z_sample_t *sample) { + return _z_keyexpr_check(sample->keyexpr) && _z_bytes_check(sample->payload); +} /** * Represents a Zenoh value. diff --git a/src/api/api.c b/src/api/api.c index 1369c82b1..19527e0a5 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -312,31 +312,37 @@ _Bool z_value_is_initialized(z_value_t *value) { } void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_sample_t *sample) { - if (closure->call) { + if (closure->call != NULL) { + (closure->call)(sample, closure->context); + } +} + +void z_closure_owned_sample_call(const z_owned_closure_owned_sample_t *closure, z_owned_sample_t *sample) { + if (closure->call != NULL) { (closure->call)(sample, closure->context); } } void z_closure_query_call(const z_owned_closure_query_t *closure, const z_query_t *query) { - if (closure->call) { + if (closure->call != NULL) { (closure->call)(query, closure->context); } } void z_closure_reply_call(const z_owned_closure_reply_t *closure, z_owned_reply_t *reply) { - if (closure->call) { + if (closure->call != NULL) { (closure->call)(reply, closure->context); } } void z_closure_hello_call(const z_owned_closure_hello_t *closure, z_owned_hello_t *hello) { - if (closure->call) { + if (closure->call != NULL) { (closure->call)(hello, closure->context); } } void z_closure_zid_call(const z_owned_closure_zid_t *closure, const z_id_t *id) { - if (closure->call) { + if (closure->call != NULL) { (closure->call)(id, closure->context); } } @@ -422,6 +428,7 @@ OWNED_FUNCTIONS_PTR_DROP(z_scouting_config_t, z_owned_scouting_config_t, scoutin OWNED_FUNCTIONS_PTR_INTERNAL(z_keyexpr_t, z_owned_keyexpr_t, keyexpr, _z_keyexpr_free, _z_keyexpr_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_hello_t, z_owned_hello_t, hello, _z_hello_free, _z_owner_noop_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_str_array_t, z_owned_str_array_t, str_array, _z_str_array_free, _z_owner_noop_copy) +OWNED_FUNCTIONS_PTR_INTERNAL(z_sample_t, z_owned_sample_t, sample, _z_sample_free, _z_sample_copy) _Bool z_session_check(const z_owned_session_t *val) { return val->_value.in != NULL; } z_session_t z_session_loan(const z_owned_session_t *val) { return (z_session_t){._val = val->_value}; } @@ -454,6 +461,11 @@ z_owned_closure_sample_t z_closure_sample(_z_data_handler_t call, _z_dropper_han return (z_owned_closure_sample_t){.call = call, .drop = drop, .context = context}; } +z_owned_closure_owned_sample_t z_closure_owned_sample(_z_owned_sample_handler_t call, _z_dropper_handler_t drop, + void *context) { + return (z_owned_closure_owned_sample_t){.call = call, .drop = drop, .context = context}; +} + z_owned_closure_query_t z_closure_query(_z_queryable_handler_t call, _z_dropper_handler_t drop, void *context) { return (z_owned_closure_query_t){.call = call, .drop = drop, .context = context}; } @@ -471,6 +483,7 @@ z_owned_closure_zid_t z_closure_zid(z_id_handler_t call, _z_dropper_handler_t dr } OWNED_FUNCTIONS_CLOSURE(z_owned_closure_sample_t, closure_sample) +OWNED_FUNCTIONS_CLOSURE(z_owned_closure_owned_sample_t, closure_owned_sample) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_query_t, closure_query) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_reply_t, closure_reply) OWNED_FUNCTIONS_CLOSURE(z_owned_closure_hello_t, closure_hello) @@ -1258,4 +1271,4 @@ z_owned_bytes_map_t z_bytes_map_new(void) { return (z_owned_bytes_map_t){._inner z_owned_bytes_map_t z_bytes_map_null(void) { return (z_owned_bytes_map_t){._inner = NULL}; } z_bytes_t z_bytes_from_str(const char *str) { return z_bytes_wrap((const uint8_t *)str, strlen(str)); } z_bytes_t z_bytes_null(void) { return (z_bytes_t){.len = 0, ._is_alloc = false, .start = NULL}; } -#endif \ No newline at end of file +#endif diff --git a/src/api/handlers.c b/src/api/handlers.c new file mode 100644 index 000000000..e1f0594b9 --- /dev/null +++ b/src/api/handlers.c @@ -0,0 +1,37 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/api/handlers.h" + +#include "zenoh-pico/net/memory.h" +#include "zenoh-pico/system/platform.h" + +// -- Sample +void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src) { + memcpy(dst, src, sizeof(z_owned_sample_t)); +} + +z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { + z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t)); + if (dst == NULL) { + return NULL; + } + if (src != NULL) { + dst->_value = (_z_sample_t *)zp_malloc(sizeof(_z_sample_t)); + _z_sample_copy(dst->_value, src); + } else { + dst->_value = NULL; + } + return dst; +} diff --git a/src/collections/fifo.c b/src/collections/fifo.c new file mode 100644 index 000000000..463c7f9db --- /dev/null +++ b/src/collections/fifo.c @@ -0,0 +1,53 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include "zenoh-pico/collections/fifo.h" + +#include +#include +#include + +/*-------- fifo --------*/ +int8_t _z_fifo_init(_z_fifo_t *r, size_t capacity) { + _z_ring_init(&r->_ring, capacity); + return 0; +} + +_z_fifo_t _z_fifo_make(size_t capacity) { + _z_fifo_t v; + _z_fifo_init(&v, capacity); + return v; +} + +size_t _z_fifo_capacity(const _z_fifo_t *r) { return _z_ring_capacity(&r->_ring); } +size_t _z_fifo_len(const _z_fifo_t *r) { return _z_ring_len(&r->_ring); } +bool _z_fifo_is_empty(const _z_fifo_t *r) { return _z_ring_is_empty(&r->_ring); } +bool _z_fifo_is_full(const _z_fifo_t *r) { return _z_fifo_len(r) == _z_fifo_capacity(r); } + +void *_z_fifo_push(_z_fifo_t *r, void *e) { return _z_ring_push(&r->_ring, e); } +void _z_fifo_push_drop(_z_fifo_t *r, void *e, z_element_free_f free_f) { + void *ret = _z_fifo_push(r, e); + if (ret != NULL) { + free_f(&ret); + } +} +void *_z_fifo_pull(_z_fifo_t *r) { return _z_ring_pull(&r->_ring); } +void _z_fifo_clear(_z_fifo_t *r, z_element_free_f free_f) { _z_ring_clear(&r->_ring, free_f); } +void _z_fifo_free(_z_fifo_t **r, z_element_free_f free_f) { + _z_fifo_t *ptr = (_z_fifo_t *)*r; + if (ptr != NULL) { + _z_fifo_clear(ptr, free_f); + zp_free(ptr); + *r = NULL; + } +} diff --git a/src/collections/fifo_mt.c b/src/collections/fifo_mt.c new file mode 100644 index 000000000..48837b5dd --- /dev/null +++ b/src/collections/fifo_mt.c @@ -0,0 +1,115 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/collections/fifo_mt.h" + +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/utils/logging.h" + +/*-------- Fifo Buffer Multithreaded --------*/ +int8_t _z_fifo_mt_init(_z_fifo_mt_t *fifo, size_t capacity) { + _Z_RETURN_IF_ERR(_z_fifo_init(&fifo->_fifo, capacity)) + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_init(&fifo->_mutex)) + _Z_RETURN_IF_ERR(zp_condvar_init(&fifo->_cv_not_full)) + _Z_RETURN_IF_ERR(zp_condvar_init(&fifo->_cv_not_empty)) +#endif + + return _Z_RES_OK; +} + +_z_fifo_mt_t *_z_fifo_mt_new(size_t capacity) { + _z_fifo_mt_t *fifo = (_z_fifo_mt_t *)zp_malloc(sizeof(_z_fifo_mt_t)); + if (fifo == NULL) { + _Z_ERROR("zp_malloc failed"); + return NULL; + } + + int8_t ret = _z_fifo_mt_init(fifo, capacity); + if (ret != _Z_RES_OK) { + _Z_ERROR("_z_fifo_mt_init failed: %i", ret); + zp_free(fifo); + return NULL; + } + + return fifo; +} + +void _z_fifo_mt_clear(_z_fifo_mt_t *fifo, z_element_free_f free_f) { +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_free(&fifo->_mutex); + zp_condvar_free(&fifo->_cv_not_full); + zp_condvar_free(&fifo->_cv_not_empty); +#endif + + _z_fifo_clear(&fifo->_fifo, free_f); +} + +void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f) { + _z_fifo_mt_clear(fifo, free_f); + zp_free(fifo); +} + +int8_t _z_fifo_mt_push(const void *elem, void *context, z_element_free_f element_free) { + if (elem == NULL || context == NULL) { + return _Z_ERR_GENERIC; + } + + _z_fifo_mt_t *f = (_z_fifo_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex)) + while (elem != NULL) { + elem = _z_fifo_push(&f->_fifo, (void *)elem); + if (elem != NULL) { + _Z_RETURN_IF_ERR(zp_condvar_wait(&f->_cv_not_full, &f->_mutex)) + } else { + _Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_empty)) + } + } + _Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex)) +#else // Z_FEATURE_MULTI_THREAD == 1 + _z_fifo_push_drop(&f->_fifo, elem, element_free); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return _Z_RES_OK; +} + +int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move) { + _z_fifo_mt_t *f = (_z_fifo_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + void *src = NULL; + _Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex)) + while (src == NULL) { + src = _z_fifo_pull(&f->_fifo); + if (src == NULL) { + _Z_RETURN_IF_ERR(zp_condvar_wait(&f->_cv_not_empty, &f->_mutex)) + } else { + _Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_full)) + } + } + _Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex)) + element_move(dst, src); +#else // Z_FEATURE_MULTI_THREAD == 1 + void *src = _z_fifo_pull(&f->_fifo); + if (src) { + element_move(dst, src); + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return _Z_RES_OK; +} diff --git a/src/collections/lifo.c b/src/collections/lifo.c new file mode 100644 index 000000000..6cccca8fa --- /dev/null +++ b/src/collections/lifo.c @@ -0,0 +1,90 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include "zenoh-pico/collections/lifo.h" + +#include +#include +#include + +/*-------- lifo --------*/ +int8_t _z_lifo_init(_z_lifo_t *r, size_t capacity) { + memset(r, 0, sizeof(_z_lifo_t)); + if (capacity != (size_t)0) { + r->_val = (void **)zp_malloc(sizeof(void *) * capacity); + } + if (r->_val != NULL) { + memset(r->_val, 0, capacity); + r->_capacity = capacity; + } + return 0; +} + +_z_lifo_t _z_lifo_make(size_t capacity) { + _z_lifo_t v; + _z_lifo_init(&v, capacity); + return v; +} + +size_t _z_lifo_capacity(const _z_lifo_t *r) { return r->_capacity; } +size_t _z_lifo_len(const _z_lifo_t *r) { return r->_len; } +_Bool _z_lifo_is_empty(const _z_lifo_t *r) { return r->_len == 0; } +_Bool _z_lifo_is_full(const _z_lifo_t *r) { return r->_len == r->_capacity; } + +void *_z_lifo_push(_z_lifo_t *r, void *e) { + void *ret = e; + if (!_z_lifo_is_full(r)) { + r->_val[r->_len] = e; + r->_len++; + ret = NULL; + } + return ret; +} + +void _z_lifo_push_drop(_z_lifo_t *r, void *e, z_element_free_f free_f) { + void *ret = _z_lifo_push(r, e); + if (ret != NULL) { + free_f(&ret); + } +} + +void *_z_lifo_pull(_z_lifo_t *r) { + void *ret = NULL; + if (!_z_lifo_is_empty(r)) { + r->_len--; + ret = r->_val[r->_len]; + } + return ret; +} + +void _z_lifo_clear(_z_lifo_t *r, z_element_free_f free_f) { + void *e = _z_lifo_pull(r); + while (e != NULL) { + free_f(&e); + e = _z_lifo_pull(r); + } + zp_free(r->_val); + + r->_val = NULL; + r->_capacity = (size_t)0; + r->_len = (size_t)0; +} + +void _z_lifo_free(_z_lifo_t **r, z_element_free_f free_f) { + _z_lifo_t *ptr = (_z_lifo_t *)*r; + if (ptr != NULL) { + _z_lifo_clear(ptr, free_f); + zp_free(ptr); + *r = NULL; + } +} diff --git a/src/collections/ring.c b/src/collections/ring.c new file mode 100644 index 000000000..ce3e90894 --- /dev/null +++ b/src/collections/ring.c @@ -0,0 +1,115 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/collections/ring.h" + +#include +#include +#include + +/*-------- ring --------*/ +int8_t _z_ring_init(_z_ring_t *r, size_t capacity) { + // We need one more element to differentiate wether the ring is empty or full + capacity++; + + memset(r, 0, sizeof(_z_ring_t)); + if (capacity != (size_t)0) { + r->_val = (void **)zp_malloc(sizeof(void *) * capacity); + } + if (r->_val != NULL) { + memset(r->_val, 0, capacity); + r->_capacity = capacity; + } + return 0; +} + +_z_ring_t _z_ring_make(size_t capacity) { + _z_ring_t v; + _z_ring_init(&v, capacity); + return v; +} + +size_t _z_ring_capacity(const _z_ring_t *r) { return r->_capacity - (size_t)1; } + +size_t _z_ring_len(const _z_ring_t *r) { + if (r->_w_idx >= r->_r_idx) { + return r->_w_idx - r->_r_idx; + } else { + return r->_w_idx + (r->_capacity - r->_r_idx); + } +} + +bool _z_ring_is_empty(const _z_ring_t *r) { return r->_w_idx == r->_r_idx; } + +bool _z_ring_is_full(const _z_ring_t *r) { return _z_ring_len(r) == _z_ring_capacity(r); } + +void *_z_ring_push(_z_ring_t *r, void *e) { + void *ret = e; + if (!_z_ring_is_full(r)) { + r->_val[r->_w_idx] = e; + r->_w_idx = (r->_w_idx + (size_t)1) % r->_capacity; + ret = NULL; + } + return ret; +} + +void *_z_ring_push_force(_z_ring_t *r, void *e) { + void *ret = _z_ring_push(r, e); + if (ret != NULL) { + ret = _z_ring_pull(r); + _z_ring_push(r, e); + } + return ret; +} + +void _z_ring_push_force_drop(_z_ring_t *r, void *e, z_element_free_f free_f) { + void *ret = _z_ring_push_force(r, e); + if (ret != NULL) { + free_f(&ret); + } +} + +void *_z_ring_pull(_z_ring_t *r) { + void *ret = NULL; + if (!_z_ring_is_empty(r)) { + ret = r->_val[r->_r_idx]; + r->_val[r->_r_idx] = NULL; + r->_r_idx = (r->_r_idx + (size_t)1) % r->_capacity; + } + return ret; +} + +void _z_ring_clear(_z_ring_t *r, z_element_free_f free_f) { + void *e = _z_ring_pull(r); + while (e != NULL) { + free_f(&e); + e = _z_ring_pull(r); + } + zp_free(r->_val); + + r->_val = NULL; + r->_capacity = (size_t)0; + r->_len = (size_t)0; + r->_r_idx = (size_t)0; + r->_w_idx = (size_t)0; +} + +void _z_ring_free(_z_ring_t **r, z_element_free_f free_f) { + _z_ring_t *ptr = (_z_ring_t *)*r; + if (ptr != NULL) { + _z_ring_clear(ptr, free_f); + zp_free(ptr); + *r = NULL; + } +} diff --git a/src/collections/ring_mt.c b/src/collections/ring_mt.c new file mode 100644 index 000000000..ab7083be0 --- /dev/null +++ b/src/collections/ring_mt.c @@ -0,0 +1,97 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/collections/ring_mt.h" + +#include "zenoh-pico/protocol/codec/core.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/utils/logging.h" + +/*-------- Ring Buffer Multithreaded --------*/ +int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity) { + _Z_RETURN_IF_ERR(_z_ring_init(&ring->_ring, capacity)) + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_init(&ring->_mutex)) +#endif + return _Z_RES_OK; +} + +_z_ring_mt_t *_z_ring_mt_new(size_t capacity) { + _z_ring_mt_t *ring = (_z_ring_mt_t *)zp_malloc(sizeof(_z_ring_mt_t)); + if (ring == NULL) { + _Z_ERROR("zp_malloc failed"); + return NULL; + } + + int8_t ret = _z_ring_mt_init(ring, capacity); + if (ret != _Z_RES_OK) { + _Z_ERROR("_z_ring_mt_init failed: %i", ret); + return NULL; + } + + return ring; +} + +void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f) { +#if Z_FEATURE_MULTI_THREAD == 1 + zp_mutex_free(&ring->_mutex); +#endif + + _z_ring_clear(&ring->_ring, free_f); +} + +void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f) { + _z_ring_mt_clear(ring, free_f); + + zp_free(ring); +} + +int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element_free) { + if (elem == NULL || context == NULL) { + return _Z_ERR_GENERIC; + } + + _z_ring_mt_t *r = (_z_ring_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex)) +#endif + + _z_ring_push_force_drop(&r->_ring, (void *)elem, element_free); + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex)) +#endif + return _Z_RES_OK; +} + +int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move) { + _z_ring_mt_t *r = (_z_ring_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex)) +#endif + + void *src = _z_ring_pull(&r->_ring); + +#if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex)) +#endif + + if (src) { + element_move(dst, src); + } + return _Z_RES_OK; +} diff --git a/src/collections/string.c b/src/collections/string.c index 98d7dda3d..b53f942c8 100644 --- a/src/collections/string.c +++ b/src/collections/string.c @@ -25,6 +25,8 @@ _z_string_t _z_string_make(const char *value) { return s; } +size_t _z_string_size(const _z_string_t *s) { return s->len; } + void _z_string_copy(_z_string_t *dst, const _z_string_t *src) { if (src->val != NULL) { dst->val = _z_str_clone(src->val); diff --git a/src/net/memory.c b/src/net/memory.c index b8cd9f55c..0c2e930b0 100644 --- a/src/net/memory.c +++ b/src/net/memory.c @@ -49,6 +49,27 @@ void _z_sample_free(_z_sample_t **sample) { } } +void _z_sample_copy(_z_sample_t *dst, const _z_sample_t *src) { + dst->keyexpr = _z_keyexpr_duplicate(src->keyexpr); + dst->payload = _z_bytes_duplicate(&src->payload); + dst->timestamp = _z_timestamp_duplicate(&src->timestamp); + + // TODO(sashacmc): should be changed after encoding rework + dst->encoding.prefix = src->encoding.prefix; + _z_bytes_copy(&dst->encoding.suffix, &src->encoding.suffix); + + dst->kind = src->kind; +#if Z_FEATURE_ATTACHMENT == 1 + dst->attachment = src->attachment; +#endif +} + +_z_sample_t _z_sample_duplicate(const _z_sample_t *src) { + _z_sample_t dst; + _z_sample_copy(&dst, src); + return dst; +} + void _z_hello_clear(_z_hello_t *hello) { if (hello->locators.len > 0) { _z_str_array_clear(&hello->locators); diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c new file mode 100644 index 000000000..b390908d7 --- /dev/null +++ b/tests/z_channels_test.c @@ -0,0 +1,118 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico/api/handlers.h" + +#undef NDEBUG +#include + +#define SEND(channel, v) \ + do { \ + z_sample_t sample; \ + sample.payload.start = (const uint8_t *)v; \ + sample.payload.len = strlen(v); \ + sample.keyexpr = _z_rname("key"); \ + z_call(channel.send, &sample); \ + } while (0); + +#define RECV(channel, buf) \ + do { \ + z_owned_sample_t sample = z_sample_null(); \ + z_call(channel.recv, &sample); \ + if (z_check(sample)) { \ + strncpy(buf, (const char *)z_loan(sample).payload.start, (size_t)z_loan(sample).payload.len); \ + buf[z_loan(sample).payload.len] = '\0'; \ + z_drop(z_move(sample)); \ + } else { \ + buf[0] = '\0'; \ + } \ + } while (0); + +void sample_fifo_channel_test(void) { + z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(10); + + SEND(channel, "v1") + SEND(channel, "v22") + SEND(channel, "v333") + SEND(channel, "v4444") + + char buf[100]; + + RECV(channel, buf) + assert(strcmp(buf, "v1") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v22") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v333") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v4444") == 0); + + z_drop(z_move(channel)); +} + +void sample_ring_channel_test_in_size(void) { + z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(10); + + SEND(channel, "v1") + SEND(channel, "v22") + SEND(channel, "v333") + SEND(channel, "v4444") + + char buf[100]; + + RECV(channel, buf) + assert(strcmp(buf, "v1") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v22") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v333") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v4444") == 0); + RECV(channel, buf) + assert(strcmp(buf, "") == 0); + + z_drop(z_move(channel)); +} + +void sample_ring_channel_test_over_size(void) { + z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(3); + + SEND(channel, "v1") + SEND(channel, "v22") + SEND(channel, "v333") + SEND(channel, "v4444") + + char buf[100]; + + RECV(channel, buf) + assert(strcmp(buf, "v22") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v333") == 0); + RECV(channel, buf) + assert(strcmp(buf, "v4444") == 0); + RECV(channel, buf) + assert(strcmp(buf, "") == 0); + + z_drop(z_move(channel)); +} + +int main(void) { + sample_fifo_channel_test(); + sample_ring_channel_test_in_size(); + sample_ring_channel_test_over_size(); +} diff --git a/tests/z_collections_test.c b/tests/z_collections_test.c new file mode 100644 index 000000000..4f61cadf8 --- /dev/null +++ b/tests/z_collections_test.c @@ -0,0 +1,269 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// +#include +#include +#include +#include + +#include "zenoh-pico/collections/fifo.h" +#include "zenoh-pico/collections/lifo.h" +#include "zenoh-pico/collections/ring.h" +#include "zenoh-pico/collections/string.h" + +#undef NDEBUG +#include + +char *a = "a"; +char *b = "b"; +char *c = "c"; +char *d = "d"; + +// RING +_Z_RING_DEFINE(_z_str, char) + +void print_ring(_z_ring_t *r) { + printf("Ring { capacity: %zu, r_idx: %zu, w_idx: %zu, len: %zu }\n", _z_ring_capacity(r), r->_r_idx, r->_w_idx, + _z_ring_len(r)); +} + +void ring_test(void) { + _z_str_ring_t r = _z_str_ring_make(3); + print_ring(&r); + assert(_z_str_ring_is_empty(&r)); + + // One + char *s = _z_str_ring_push(&r, a); + print_ring(&r); + assert(s == NULL); + assert(_z_str_ring_len(&r) == 1); + + s = _z_str_ring_pull(&r); + print_ring(&r); + assert(strcmp(a, s) == 0); + assert(_z_str_ring_is_empty(&r)); + + s = _z_str_ring_pull(&r); + print_ring(&r); + assert(s == NULL); + assert(_z_str_ring_is_empty(&r)); + + // Two + s = _z_str_ring_push(&r, a); + print_ring(&r); + assert(s == NULL); + assert(_z_str_ring_len(&r) == 1); + + s = _z_str_ring_push(&r, b); + print_ring(&r); + assert(s == NULL); + assert(_z_str_ring_len(&r) == 2); + + s = _z_str_ring_push(&r, c); + print_ring(&r); + assert(s == NULL); + assert(_z_str_ring_len(&r) == 3); + assert(_z_str_ring_is_full(&r)); + + s = _z_str_ring_push(&r, d); + print_ring(&r); + printf("%s == %s\n", d, s); + assert(strcmp(d, s) == 0); + assert(_z_str_ring_len(&r) == 3); + assert(_z_str_ring_is_full(&r)); + + s = _z_str_ring_push_force(&r, d); + print_ring(&r); + printf("%s == %s\n", a, s); + assert(strcmp(a, s) == 0); + assert(_z_str_ring_len(&r) == 3); + assert(_z_str_ring_is_full(&r)); + + s = _z_str_ring_push_force(&r, d); + print_ring(&r); + printf("%s == %s\n", b, s); + assert(strcmp(b, s) == 0); + assert(_z_str_ring_len(&r) == 3); + assert(_z_str_ring_is_full(&r)); + + s = _z_str_ring_push_force(&r, d); + print_ring(&r); + printf("%s == %s\n", c, s); + assert(strcmp(c, s) == 0); + assert(_z_str_ring_len(&r) == 3); + assert(_z_str_ring_is_full(&r)); + + s = _z_str_ring_pull(&r); + print_ring(&r); + printf("%s == %s\n", d, s); + assert(strcmp(d, s) == 0); + assert(_z_str_ring_len(&r) == 2); + + s = _z_str_ring_pull(&r); + print_ring(&r); + printf("%s == %s\n", d, s); + assert(strcmp(d, s) == 0); + assert(_z_str_ring_len(&r) == 1); + + s = _z_str_ring_pull(&r); + print_ring(&r); + printf("%s == %s\n", d, s); + assert(strcmp(d, s) == 0); + assert(_z_str_ring_is_empty(&r)); +} + +// LIFO +_Z_LIFO_DEFINE(_z_str, char) + +void print_lifo(_z_lifo_t *r) { printf("Lifo { capacity: %zu, len: %zu }\n", _z_lifo_capacity(r), _z_lifo_len(r)); } + +void lifo_test(void) { + _z_str_lifo_t r = _z_str_lifo_make(3); + print_lifo(&r); + assert(_z_str_lifo_is_empty(&r)); + + // One + char *s = _z_str_lifo_push(&r, a); + print_lifo(&r); + assert(s == NULL); + assert(_z_str_lifo_len(&r) == 1); + + s = _z_str_lifo_pull(&r); + print_lifo(&r); + printf("%s == %s\n", a, s); + assert(strcmp(a, s) == 0); + assert(_z_str_lifo_is_empty(&r)); + + s = _z_str_lifo_pull(&r); + print_lifo(&r); + assert(s == NULL); + assert(_z_str_lifo_is_empty(&r)); + + // Two + s = _z_str_lifo_push(&r, a); + print_lifo(&r); + assert(s == NULL); + assert(_z_str_lifo_len(&r) == 1); + + s = _z_str_lifo_push(&r, b); + print_lifo(&r); + assert(s == NULL); + assert(_z_str_lifo_len(&r) == 2); + + s = _z_str_lifo_push(&r, c); + print_lifo(&r); + assert(s == NULL); + assert(_z_str_lifo_len(&r) == 3); + assert(_z_str_lifo_is_full(&r)); + + s = _z_str_lifo_push(&r, d); + print_lifo(&r); + printf("%s == %s\n", d, s); + assert(strcmp(d, s) == 0); + assert(_z_str_lifo_len(&r) == 3); + assert(_z_str_lifo_is_full(&r)); + + s = _z_str_lifo_pull(&r); + print_lifo(&r); + printf("%s == %s\n", c, s); + assert(strcmp(c, s) == 0); + assert(_z_str_lifo_len(&r) == 2); + + s = _z_str_lifo_pull(&r); + print_lifo(&r); + printf("%s == %s\n", b, s); + assert(strcmp(b, s) == 0); + assert(_z_str_lifo_len(&r) == 1); + + s = _z_str_lifo_pull(&r); + print_lifo(&r); + printf("%s == %s\n", a, s); + assert(strcmp(a, s) == 0); + assert(_z_str_lifo_is_empty(&r)); +} + +// FIFO +_Z_FIFO_DEFINE(_z_str, char) + +void print_fifo(_z_fifo_t *r) { printf("Fifo { capacity: %zu, len: %zu }\n", _z_fifo_capacity(r), _z_fifo_len(r)); } + +void fifo_test(void) { + _z_str_fifo_t r = _z_str_fifo_make(3); + print_fifo(&r); + assert(_z_str_fifo_is_empty(&r)); + + // One + char *s = _z_str_fifo_push(&r, a); + print_fifo(&r); + assert(s == NULL); + assert(_z_str_fifo_len(&r) == 1); + + s = _z_str_fifo_pull(&r); + print_fifo(&r); + printf("%s == %s\n", a, s); + assert(strcmp(a, s) == 0); + assert(_z_str_fifo_is_empty(&r)); + + s = _z_str_fifo_pull(&r); + print_fifo(&r); + assert(s == NULL); + assert(_z_str_fifo_is_empty(&r)); + + // Two + s = _z_str_fifo_push(&r, a); + print_fifo(&r); + assert(s == NULL); + assert(_z_str_fifo_len(&r) == 1); + + s = _z_str_fifo_push(&r, b); + print_fifo(&r); + assert(s == NULL); + assert(_z_str_fifo_len(&r) == 2); + + s = _z_str_fifo_push(&r, c); + print_fifo(&r); + assert(s == NULL); + assert(_z_str_fifo_len(&r) == 3); + assert(_z_str_fifo_is_full(&r)); + + s = _z_str_fifo_push(&r, d); + print_fifo(&r); + printf("%s == %s\n", d, s); + assert(strcmp(d, s) == 0); + assert(_z_str_fifo_len(&r) == 3); + assert(_z_str_fifo_is_full(&r)); + + s = _z_str_fifo_pull(&r); + print_fifo(&r); + printf("%s == %s\n", a, s); + assert(strcmp(a, s) == 0); + assert(_z_str_fifo_len(&r) == 2); + + s = _z_str_fifo_pull(&r); + print_fifo(&r); + printf("%s == %s\n", b, s); + assert(strcmp(b, s) == 0); + assert(_z_str_fifo_len(&r) == 1); + + s = _z_str_fifo_pull(&r); + print_fifo(&r); + printf("%s == %s\n", c, s); + assert(strcmp(c, s) == 0); + assert(_z_str_fifo_is_empty(&r)); +} + +int main(void) { + ring_test(); + lifo_test(); + fifo_test(); +}