Skip to content

Commit

Permalink
Add try_recv closure to channels (#397)
Browse files Browse the repository at this point in the history
* Add try_recv closure to channels

* Add try_pull for ring_mt
  • Loading branch information
sashacmc authored Apr 8, 2024
1 parent 739a063 commit c613907
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 59 deletions.
106 changes: 57 additions & 49 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,69 @@ void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// -- Channel
#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \
collection_new_f, collection_free_f, collection_push_f, collection_pull_f, elem_move_f, \
elem_convert_f, elem_free_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
collection_type *collection; \
} z_owned_##name##_t; \
\
static inline void _z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
*elem = NULL; \
} \
static inline void _z_##name##_elem_move(void *dst, const void *src) { \
elem_move_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void _z_##name##_send(const send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
} \
int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
} \
} \
static inline void _z_##name##_recv(recv_type *elem, void *context) { \
int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, ret); \
} \
} \
\
static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \
z_owned_##name##_t channel; \
channel.collection = collection_new_f(capacity); \
channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \
channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \
return channel; \
} \
static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \
static inline void z_##name##_drop(z_owned_##name##_t *channel) { \
collection_free_f(channel->collection, _z_##name##_elem_free); \
z_##send_closure_name##_drop(&channel->send); \
z_##recv_closure_name##_drop(&channel->recv); \
#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \
collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \
collection_try_pull_f, elem_move_f, elem_convert_f, elem_free_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
z_owned_##recv_closure_name##_t try_recv; \
collection_type *collection; \
} z_owned_##name##_t; \
\
static inline void _z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
*elem = NULL; \
} \
static inline void _z_##name##_elem_move(void *dst, const void *src) { \
elem_move_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void _z_##name##_send(const send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
} \
int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
} \
} \
static inline void _z_##name##_recv(recv_type *elem, void *context) { \
int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, ret); \
} \
} \
static inline void _z_##name##_try_recv(recv_type *elem, void *context) { \
int8_t ret = collection_try_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \
} \
} \
\
static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \
z_owned_##name##_t channel; \
channel.collection = collection_new_f(capacity); \
channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \
channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \
channel.try_recv = z_##recv_closure_name(_z_##name##_try_recv, NULL, channel.collection); \
return channel; \
} \
static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \
static inline void z_##name##_drop(z_owned_##name##_t *channel) { \
collection_free_f(channel->collection, _z_##name##_elem_free); \
z_##send_closure_name##_drop(&channel->send); \
z_##recv_closure_name##_drop(&channel->recv); \
}

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_owned_sample_move,
_z_sample_to_owned_ptr, z_sample_drop)
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_owned_sample_move,
_z_sample_to_owned_ptr, z_sample_drop)
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/fifo_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f);
int8_t _z_fifo_mt_push(const void *src, void *context, z_element_free_f element_free);

int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move);
int8_t _z_fifo_mt_try_pull(void *dst, void *context, z_element_move_f element_move);

#endif // ZENOH_PICO_COLLECTIONS_FIFO_MT_H
2 changes: 2 additions & 0 deletions include/zenoh-pico/collections/ring_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef struct {
_z_ring_t _ring;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_t _mutex;
zp_condvar_t _cv_not_empty;
#endif
} _z_ring_mt_t;

Expand All @@ -37,5 +38,6 @@ void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f);
int8_t _z_ring_mt_push(const void *src, void *context, z_element_free_f element_free);

int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move);
int8_t _z_ring_mt_try_pull(void *dst, void *context, z_element_move_f element_move);

#endif // ZENOH_PICO_COLLECTIONS_RING_MT_H
24 changes: 23 additions & 1 deletion src/collections/fifo_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,32 @@ int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move)
element_move(dst, src);
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_fifo_pull(&f->_fifo);
if (src) {
if (src != NULL) {
element_move(dst, src);
}
#endif // Z_FEATURE_MULTI_THREAD == 1

return _Z_RES_OK;
}

int8_t _z_fifo_mt_try_pull(void *dst, void *context, z_element_move_f element_move) {
_z_fifo_mt_t *f = (_z_fifo_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
void *src = NULL;
_Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex))
src = _z_fifo_pull(&f->_fifo);
if (src != NULL) {
_Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_full))
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex))
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_fifo_pull(&f->_fifo);
#endif // Z_FEATURE_MULTI_THREAD == 1

if (src != NULL) {
element_move(dst, src);
}

return _Z_RES_OK;
}
29 changes: 28 additions & 1 deletion src/collections/ring_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity) {

#if Z_FEATURE_MULTI_THREAD == 1
_Z_RETURN_IF_ERR(zp_mutex_init(&ring->_mutex))
_Z_RETURN_IF_ERR(zp_condvar_init(&ring->_cv_not_empty))
#endif
return _Z_RES_OK;
}
Expand All @@ -47,6 +48,7 @@ _z_ring_mt_t *_z_ring_mt_new(size_t capacity) {
void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&ring->_mutex);
zp_condvar_free(&ring->_cv_not_empty);
#endif

_z_ring_clear(&ring->_ring, free_f);
Expand All @@ -72,6 +74,7 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element
_z_ring_push_force_drop(&r->_ring, (void *)elem, element_free);

#if Z_FEATURE_MULTI_THREAD == 1
_Z_RETURN_IF_ERR(zp_condvar_signal(&r->_cv_not_empty))
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
#endif
return _Z_RES_OK;
Expand All @@ -80,6 +83,30 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element
int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move) {
_z_ring_mt_t *r = (_z_ring_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
void *src = NULL;
_Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex))
while (src == NULL) {
src = _z_ring_pull(&r->_ring);
if (src == NULL) {
_Z_RETURN_IF_ERR(zp_condvar_wait(&r->_cv_not_empty, &r->_mutex))
}
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
element_move(dst, src);
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_ring_pull(&r->_ring);
if (src != NULL) {
element_move(dst, src);
}
#endif // Z_FEATURE_MULTI_THREAD == 1

return _Z_RES_OK;
}

int8_t _z_ring_mt_try_pull(void *dst, void *context, z_element_move_f element_move) {
_z_ring_mt_t *r = (_z_ring_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
_Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex))
#endif
Expand All @@ -90,7 +117,7 @@ int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move)
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
#endif

if (src) {
if (src != NULL) {
element_move(dst, src);
}
return _Z_RES_OK;
Expand Down
Loading

0 comments on commit c613907

Please sign in to comment.