From c613907bf0f0f0e471617319b74ca379cb73338c Mon Sep 17 00:00:00 2001 From: Alexander Date: Mon, 8 Apr 2024 16:35:22 +0200 Subject: [PATCH] Add try_recv closure to channels (#397) * Add try_recv closure to channels * Add try_pull for ring_mt --- include/zenoh-pico/api/handlers.h | 106 ++++++++++++----------- include/zenoh-pico/collections/fifo_mt.h | 1 + include/zenoh-pico/collections/ring_mt.h | 2 + src/collections/fifo_mt.c | 24 ++++- src/collections/ring_mt.c | 29 ++++++- tests/z_channels_test.c | 52 +++++++++-- 6 files changed, 155 insertions(+), 59 deletions(-) diff --git a/include/zenoh-pico/api/handlers.h b/include/zenoh-pico/api/handlers.h index d48c2e8fc..604031d53 100644 --- a/include/zenoh-pico/api/handlers.h +++ b/include/zenoh-pico/api/handlers.h @@ -28,61 +28,69 @@ void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src); z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src); // -- Channel -#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \ - collection_new_f, collection_free_f, collection_push_f, collection_pull_f, elem_move_f, \ - elem_convert_f, elem_free_f) \ - typedef struct { \ - z_owned_##send_closure_name##_t send; \ - z_owned_##recv_closure_name##_t recv; \ - collection_type *collection; \ - } z_owned_##name##_t; \ - \ - static inline void _z_##name##_elem_free(void **elem) { \ - elem_free_f((recv_type *)*elem); \ - *elem = NULL; \ - } \ - static inline void _z_##name##_elem_move(void *dst, const void *src) { \ - elem_move_f((recv_type *)dst, (const recv_type *)src); \ - } \ - static inline void _z_##name##_send(const send_type *elem, void *context) { \ - void *internal_elem = elem_convert_f(elem); \ - if (internal_elem == NULL) { \ - return; \ - } \ - int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \ - if (ret != _Z_RES_OK) { \ - _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ - } \ - } \ - static inline void _z_##name##_recv(recv_type *elem, void *context) { \ - int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \ - if (ret != _Z_RES_OK) { \ - _Z_ERROR("%s failed: %i", #collection_pull_f, ret); \ - } \ - } \ - \ - static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \ - z_owned_##name##_t channel; \ - channel.collection = collection_new_f(capacity); \ - channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \ - channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \ - return channel; \ - } \ - static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \ - static inline void z_##name##_drop(z_owned_##name##_t *channel) { \ - collection_free_f(channel->collection, _z_##name##_elem_free); \ - z_##send_closure_name##_drop(&channel->send); \ - z_##recv_closure_name##_drop(&channel->recv); \ +#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \ + collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \ + collection_try_pull_f, elem_move_f, elem_convert_f, elem_free_f) \ + typedef struct { \ + z_owned_##send_closure_name##_t send; \ + z_owned_##recv_closure_name##_t recv; \ + z_owned_##recv_closure_name##_t try_recv; \ + collection_type *collection; \ + } z_owned_##name##_t; \ + \ + static inline void _z_##name##_elem_free(void **elem) { \ + elem_free_f((recv_type *)*elem); \ + *elem = NULL; \ + } \ + static inline void _z_##name##_elem_move(void *dst, const void *src) { \ + elem_move_f((recv_type *)dst, (const recv_type *)src); \ + } \ + static inline void _z_##name##_send(const send_type *elem, void *context) { \ + void *internal_elem = elem_convert_f(elem); \ + if (internal_elem == NULL) { \ + return; \ + } \ + int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_push_f, ret); \ + } \ + } \ + static inline void _z_##name##_recv(recv_type *elem, void *context) { \ + int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_pull_f, ret); \ + } \ + } \ + static inline void _z_##name##_try_recv(recv_type *elem, void *context) { \ + int8_t ret = collection_try_pull_f(elem, context, _z_##name##_elem_move); \ + if (ret != _Z_RES_OK) { \ + _Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \ + } \ + } \ + \ + static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \ + z_owned_##name##_t channel; \ + channel.collection = collection_new_f(capacity); \ + channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \ + channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \ + channel.try_recv = z_##recv_closure_name(_z_##name##_try_recv, NULL, channel.collection); \ + return channel; \ + } \ + static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \ + static inline void z_##name##_drop(z_owned_##name##_t *channel) { \ + collection_free_f(channel->collection, _z_##name##_elem_free); \ + z_##send_closure_name##_drop(&channel->send); \ + z_##recv_closure_name##_drop(&channel->recv); \ } // z_owned_sample_ring_channel_t _Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t, - _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_owned_sample_move, - _z_sample_to_owned_ptr, z_sample_drop) + _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull, + _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) // z_owned_sample_fifo_channel_t _Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t, - _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_owned_sample_move, - _z_sample_to_owned_ptr, z_sample_drop) + _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull, + _z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop) #endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H diff --git a/include/zenoh-pico/collections/fifo_mt.h b/include/zenoh-pico/collections/fifo_mt.h index bcf8d700c..7a302b1a2 100644 --- a/include/zenoh-pico/collections/fifo_mt.h +++ b/include/zenoh-pico/collections/fifo_mt.h @@ -39,5 +39,6 @@ void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f); int8_t _z_fifo_mt_push(const void *src, void *context, z_element_free_f element_free); int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move); +int8_t _z_fifo_mt_try_pull(void *dst, void *context, z_element_move_f element_move); #endif // ZENOH_PICO_COLLECTIONS_FIFO_MT_H diff --git a/include/zenoh-pico/collections/ring_mt.h b/include/zenoh-pico/collections/ring_mt.h index 98a413e20..8d5be2e8f 100644 --- a/include/zenoh-pico/collections/ring_mt.h +++ b/include/zenoh-pico/collections/ring_mt.h @@ -25,6 +25,7 @@ typedef struct { _z_ring_t _ring; #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_t _mutex; + zp_condvar_t _cv_not_empty; #endif } _z_ring_mt_t; @@ -37,5 +38,6 @@ void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f); int8_t _z_ring_mt_push(const void *src, void *context, z_element_free_f element_free); int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move); +int8_t _z_ring_mt_try_pull(void *dst, void *context, z_element_move_f element_move); #endif // ZENOH_PICO_COLLECTIONS_RING_MT_H diff --git a/src/collections/fifo_mt.c b/src/collections/fifo_mt.c index 48837b5dd..e295b95e9 100644 --- a/src/collections/fifo_mt.c +++ b/src/collections/fifo_mt.c @@ -106,10 +106,32 @@ int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move) element_move(dst, src); #else // Z_FEATURE_MULTI_THREAD == 1 void *src = _z_fifo_pull(&f->_fifo); - if (src) { + if (src != NULL) { element_move(dst, src); } #endif // Z_FEATURE_MULTI_THREAD == 1 return _Z_RES_OK; } + +int8_t _z_fifo_mt_try_pull(void *dst, void *context, z_element_move_f element_move) { + _z_fifo_mt_t *f = (_z_fifo_mt_t *)context; + +#if Z_FEATURE_MULTI_THREAD == 1 + void *src = NULL; + _Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex)) + src = _z_fifo_pull(&f->_fifo); + if (src != NULL) { + _Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_full)) + } + _Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex)) +#else // Z_FEATURE_MULTI_THREAD == 1 + void *src = _z_fifo_pull(&f->_fifo); +#endif // Z_FEATURE_MULTI_THREAD == 1 + + if (src != NULL) { + element_move(dst, src); + } + + return _Z_RES_OK; +} diff --git a/src/collections/ring_mt.c b/src/collections/ring_mt.c index ab7083be0..f28c432c9 100644 --- a/src/collections/ring_mt.c +++ b/src/collections/ring_mt.c @@ -24,6 +24,7 @@ int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity) { #if Z_FEATURE_MULTI_THREAD == 1 _Z_RETURN_IF_ERR(zp_mutex_init(&ring->_mutex)) + _Z_RETURN_IF_ERR(zp_condvar_init(&ring->_cv_not_empty)) #endif return _Z_RES_OK; } @@ -47,6 +48,7 @@ _z_ring_mt_t *_z_ring_mt_new(size_t capacity) { void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f) { #if Z_FEATURE_MULTI_THREAD == 1 zp_mutex_free(&ring->_mutex); + zp_condvar_free(&ring->_cv_not_empty); #endif _z_ring_clear(&ring->_ring, free_f); @@ -72,6 +74,7 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element _z_ring_push_force_drop(&r->_ring, (void *)elem, element_free); #if Z_FEATURE_MULTI_THREAD == 1 + _Z_RETURN_IF_ERR(zp_condvar_signal(&r->_cv_not_empty)) _Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex)) #endif return _Z_RES_OK; @@ -80,6 +83,30 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move) { _z_ring_mt_t *r = (_z_ring_mt_t *)context; +#if Z_FEATURE_MULTI_THREAD == 1 + void *src = NULL; + _Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex)) + while (src == NULL) { + src = _z_ring_pull(&r->_ring); + if (src == NULL) { + _Z_RETURN_IF_ERR(zp_condvar_wait(&r->_cv_not_empty, &r->_mutex)) + } + } + _Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex)) + element_move(dst, src); +#else // Z_FEATURE_MULTI_THREAD == 1 + void *src = _z_ring_pull(&r->_ring); + if (src != NULL) { + element_move(dst, src); + } +#endif // Z_FEATURE_MULTI_THREAD == 1 + + return _Z_RES_OK; +} + +int8_t _z_ring_mt_try_pull(void *dst, void *context, z_element_move_f element_move) { + _z_ring_mt_t *r = (_z_ring_mt_t *)context; + #if Z_FEATURE_MULTI_THREAD == 1 _Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex)) #endif @@ -90,7 +117,7 @@ int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move) _Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex)) #endif - if (src) { + if (src != NULL) { element_move(dst, src); } return _Z_RES_OK; diff --git a/tests/z_channels_test.c b/tests/z_channels_test.c index b390908d7..900f6087d 100644 --- a/tests/z_channels_test.c +++ b/tests/z_channels_test.c @@ -30,10 +30,10 @@ z_call(channel.send, &sample); \ } while (0); -#define RECV(channel, buf) \ +#define _RECV(channel, method, buf) \ do { \ z_owned_sample_t sample = z_sample_null(); \ - z_call(channel.recv, &sample); \ + z_call(channel.method, &sample); \ if (z_check(sample)) { \ strncpy(buf, (const char *)z_loan(sample).payload.start, (size_t)z_loan(sample).payload.len); \ buf[z_loan(sample).payload.len] = '\0'; \ @@ -43,6 +43,9 @@ } \ } while (0); +#define RECV(channel, buf) _RECV(channel, recv, buf) +#define TRY_RECV(channel, buf) _RECV(channel, try_recv, buf) + void sample_fifo_channel_test(void) { z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(10); @@ -65,16 +68,46 @@ void sample_fifo_channel_test(void) { z_drop(z_move(channel)); } -void sample_ring_channel_test_in_size(void) { - z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(10); +void sample_fifo_channel_test_try_recv(void) { + z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(10); + + char buf[100]; + + TRY_RECV(channel, buf) + assert(strcmp(buf, "") == 0); SEND(channel, "v1") SEND(channel, "v22") SEND(channel, "v333") SEND(channel, "v4444") + TRY_RECV(channel, buf) + assert(strcmp(buf, "v1") == 0); + TRY_RECV(channel, buf) + assert(strcmp(buf, "v22") == 0); + TRY_RECV(channel, buf) + assert(strcmp(buf, "v333") == 0); + TRY_RECV(channel, buf) + assert(strcmp(buf, "v4444") == 0); + TRY_RECV(channel, buf) + assert(strcmp(buf, "") == 0); + + z_drop(z_move(channel)); +} + +void sample_ring_channel_test_in_size(void) { + z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(10); + char buf[100]; + TRY_RECV(channel, buf) + assert(strcmp(buf, "") == 0); + + SEND(channel, "v1") + SEND(channel, "v22") + SEND(channel, "v333") + SEND(channel, "v4444") + RECV(channel, buf) assert(strcmp(buf, "v1") == 0); RECV(channel, buf) @@ -83,7 +116,7 @@ void sample_ring_channel_test_in_size(void) { assert(strcmp(buf, "v333") == 0); RECV(channel, buf) assert(strcmp(buf, "v4444") == 0); - RECV(channel, buf) + TRY_RECV(channel, buf) assert(strcmp(buf, "") == 0); z_drop(z_move(channel)); @@ -92,20 +125,22 @@ void sample_ring_channel_test_in_size(void) { void sample_ring_channel_test_over_size(void) { z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(3); + char buf[100]; + TRY_RECV(channel, buf) + assert(strcmp(buf, "") == 0); + SEND(channel, "v1") SEND(channel, "v22") SEND(channel, "v333") SEND(channel, "v4444") - char buf[100]; - RECV(channel, buf) assert(strcmp(buf, "v22") == 0); RECV(channel, buf) assert(strcmp(buf, "v333") == 0); RECV(channel, buf) assert(strcmp(buf, "v4444") == 0); - RECV(channel, buf) + TRY_RECV(channel, buf) assert(strcmp(buf, "") == 0); z_drop(z_move(channel)); @@ -113,6 +148,7 @@ void sample_ring_channel_test_over_size(void) { int main(void) { sample_fifo_channel_test(); + sample_fifo_channel_test_try_recv(); sample_ring_channel_test_in_size(); sample_ring_channel_test_over_size(); }