From 90bba75c8d25788fcb13f62c9e3d9c23102c57a9 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 9 Apr 2024 19:38:51 +0200 Subject: [PATCH] Add reply channel and update z_get --- examples/unix/c11/z_get.c | 44 ++++++++++------------------ include/zenoh-pico/api/handlers.h | 24 +++++++++++---- include/zenoh-pico/api/macros.h | 8 +++-- include/zenoh-pico/session/session.h | 2 ++ src/api/handlers.c | 19 ++++++++++++ src/net/memory.c | 5 ++++ src/session/query.c | 5 ++++ 7 files changed, 71 insertions(+), 36 deletions(-) diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index f98426a15..f63971a79 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -19,27 +19,6 @@ #include #if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1 -static z_condvar_t cond; -static z_mutex_t mutex; - -void reply_dropper(void *ctx) { - (void)(ctx); - printf(">> Received query final notification\n"); - z_condvar_signal(&cond); - z_condvar_free(&cond); -} - -void reply_handler(z_owned_reply_t *reply, void *ctx) { - (void)(ctx); - if (z_reply_is_ok(reply)) { - z_sample_t sample = z_reply_ok(reply); - z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); - printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); - z_drop(z_move(keystr)); - } else { - printf(">> Received an error\n"); - } -} int main(int argc, char **argv) { const char *keyexpr = "demo/example/**"; @@ -78,9 +57,6 @@ int main(int argc, char **argv) { } } - z_mutex_init(&mutex); - z_condvar_init(&cond); - z_owned_config_t config = z_config_default(); zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode)); if (clocator != NULL) { @@ -110,19 +86,29 @@ int main(int argc, char **argv) { return -1; } - z_mutex_lock(&mutex); printf("Sending Query '%s'...\n", keyexpr); z_get_options_t opts = z_get_options_default(); if (value != NULL) { opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value)); } - z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper); - if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) { + z_owned_reply_ring_channel_t channel = z_reply_ring_channel_new(1); + if (z_get(z_loan(s), ke, "", z_move(channel.send), &opts) < 0) { printf("Unable to send query.\n"); return -1; } - z_condvar_wait(&cond, &mutex); - z_mutex_unlock(&mutex); + + z_owned_reply_t reply = z_reply_null(); + z_call(channel.recv, &reply); + if (z_reply_is_ok(&reply)) { + z_sample_t sample = z_reply_ok(&reply); + z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr); + printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start); + z_drop(z_move(keystr)); + } else { + printf(">> Received an error\n"); + } + + z_drop(z_move(channel)); // Stop read and lease tasks for zenoh-pico zp_stop_read_task(z_loan(s)); diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index 604031d53..7ec9decf8 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -27,6 +27,10 @@ void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src); z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); +// -- Reply handler +void _z_owned_reply_move(z_owned_reply_t *dst, const z_owned_reply_t *src); +z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src); + // -- Channel #define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \ collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \ @@ -45,7 +49,7 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); static inline void _z_##name##_elem_move(void *dst, const void *src) { \ elem_move_f((recv_type *)dst, (const recv_type *)src); \ } \ - static inline void _z_##name##_send(const send_type *elem, void *context) { \ + static inline void _z_##name##_send(send_type *elem, void *context) { \ void *internal_elem = elem_convert_f(elem); \ if (internal_elem == NULL) { \ return; \ @@ -84,13 +88,23 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); } // z_owned_sample_ring_channel_t -_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t, - _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, +_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t, + _z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) // z_owned_sample_fifo_channel_t -_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t, - _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, +_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t, + _z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) +// z_owned_sample_ring_channel_t +_Z_CHANNEL_DEFINE(reply_ring_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_ring_mt_t, + _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, + _z_owned_reply_move, _z_reply_clone, z_reply_drop) + +// z_owned_reply_fifo_channel_t +_Z_CHANNEL_DEFINE(reply_fifo_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_fifo_mt_t, + _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, + _z_owned_reply_move, _z_reply_clone, z_reply_drop) + #endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H diff --git a/include/zenoh-pico/api/macros.h b/include/zenoh-pico/api/macros.h index e64f1f84a..878c3d9f5 100644 --- a/include/zenoh-pico/api/macros.h +++ b/include/zenoh-pico/api/macros.h @@ -71,7 +71,9 @@ z_owned_closure_hello_t * : z_closure_hello_drop, \ z_owned_closure_zid_t * : z_closure_zid_drop, \ z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \ - z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop \ + z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \ + z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \ + z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop \ )(x) /** @@ -146,7 +148,9 @@ z_owned_closure_zid_t : z_closure_zid_move, \ z_owned_sample_t : z_sample_move, \ z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \ - z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move \ + z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move, \ + z_owned_reply_ring_channel_t : z_reply_ring_channel_move, \ + z_owned_reply_fifo_channel_t : z_reply_ring_channel_move \ )(&x) /** diff --git a/include/zenoh-pico/session/session.h b/include/zenoh-pico/session/session.h index 95809b8e1..94f26759b 100644 --- a/include/zenoh-pico/session/session.h +++ b/include/zenoh-pico/session/session.h @@ -48,6 +48,7 @@ typedef struct { } _z_reply_data_t; void _z_reply_data_clear(_z_reply_data_t *rd); +void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src); _Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy) _Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t) @@ -67,6 +68,7 @@ typedef struct { } _z_reply_t; void _z_reply_clear(_z_reply_t *src); void _z_reply_free(_z_reply_t **hello); +void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src); typedef struct { _z_keyexpr_t _key; diff --git a/src/api/handlers.c b/src/api/handlers.c index e1f0594b9..41ed768ed 100644 --- a/src/api/handlers.c +++ b/src/api/handlers.c @@ -35,3 +35,22 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) { } return dst; } + +// -- Reply +void _z_owned_reply_move(z_owned_reply_t *dst, const z_owned_reply_t *src) { + memcpy(dst, src, sizeof(z_owned_sample_t)); +} + +z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src) { + z_owned_reply_t *dst = (z_owned_reply_t *)zp_malloc(sizeof(z_owned_reply_t)); + if (dst == NULL) { + return NULL; + } + if (src != NULL && src->_value) { + dst->_value = (_z_reply_t *)zp_malloc(sizeof(_z_reply_t)); + _z_reply_copy(dst->_value, src->_value); + } else { + dst->_value = NULL; + } + return dst; +} diff --git a/src/net/memory.c b/src/net/memory.c index f21c51529..c8e35869e 100644 --- a/src/net/memory.c +++ b/src/net/memory.c @@ -103,6 +103,11 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) { } } +void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src) { + _z_sample_copy(&dst->sample, &src->sample); + dst->replier_id = src->replier_id; +} + void _z_value_clear(_z_value_t *value) { _z_bytes_clear(&value->encoding.suffix); _z_bytes_clear(&value->payload); diff --git a/src/session/query.c b/src/session/query.c index 8e596e6c3..5b08c6708 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -46,6 +46,11 @@ void _z_reply_free(_z_reply_t **reply) { } } +void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src) { + _z_reply_data_copy(&dst->data, &src->data); + dst->_tag = src->_tag; +} + _Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two) { return one->_tstamp.time == two->_tstamp.time; }