Skip to content

Commit

Permalink
Add try_recv closure to channels
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 5, 2024
1 parent 2c009d0 commit 8ed3609
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 57 deletions.
106 changes: 57 additions & 49 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,61 +28,69 @@ void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// -- Channel
#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \
collection_new_f, collection_free_f, collection_push_f, collection_pull_f, elem_move_f, \
elem_convert_f, elem_free_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
collection_type *collection; \
} z_owned_##name##_t; \
\
static inline void _z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
*elem = NULL; \
} \
static inline void _z_##name##_elem_move(void *dst, const void *src) { \
elem_move_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void _z_##name##_send(const send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
} \
int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
} \
} \
static inline void _z_##name##_recv(recv_type *elem, void *context) { \
int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, ret); \
} \
} \
\
static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \
z_owned_##name##_t channel; \
channel.collection = collection_new_f(capacity); \
channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \
channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \
return channel; \
} \
static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \
static inline void z_##name##_drop(z_owned_##name##_t *channel) { \
collection_free_f(channel->collection, _z_##name##_elem_free); \
z_##send_closure_name##_drop(&channel->send); \
z_##recv_closure_name##_drop(&channel->recv); \
#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \
collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \
collection_try_pull_f, elem_move_f, elem_convert_f, elem_free_f) \
typedef struct { \
z_owned_##send_closure_name##_t send; \
z_owned_##recv_closure_name##_t recv; \
z_owned_##recv_closure_name##_t try_recv; \
collection_type *collection; \
} z_owned_##name##_t; \
\
static inline void _z_##name##_elem_free(void **elem) { \
elem_free_f((recv_type *)*elem); \
*elem = NULL; \
} \
static inline void _z_##name##_elem_move(void *dst, const void *src) { \
elem_move_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void _z_##name##_send(const send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
} \
int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
} \
} \
static inline void _z_##name##_recv(recv_type *elem, void *context) { \
int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, ret); \
} \
} \
static inline void _z_##name##_try_recv(recv_type *elem, void *context) { \
int8_t ret = collection_try_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_try_pull_f, ret); \
} \
} \
\
static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \
z_owned_##name##_t channel; \
channel.collection = collection_new_f(capacity); \
channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \
channel.recv = z_##recv_closure_name(_z_##name##_recv, NULL, channel.collection); \
channel.try_recv = z_##recv_closure_name(_z_##name##_try_recv, NULL, channel.collection); \
return channel; \
} \
static inline z_owned_##name##_t *z_##name##_move(z_owned_##name##_t *val) { return val; } \
static inline void z_##name##_drop(z_owned_##name##_t *channel) { \
collection_free_f(channel->collection, _z_##name##_elem_free); \
z_##send_closure_name##_drop(&channel->send); \
z_##recv_closure_name##_drop(&channel->recv); \
}

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_owned_sample_move,
_z_sample_to_owned_ptr, z_sample_drop)
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_owned_sample_move,
_z_sample_to_owned_ptr, z_sample_drop)
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
1 change: 1 addition & 0 deletions include/zenoh-pico/collections/fifo_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f);
int8_t _z_fifo_mt_push(const void *src, void *context, z_element_free_f element_free);

int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move);
int8_t _z_fifo_mt_try_pull(void *dst, void *context, z_element_move_f element_move);

#endif // ZENOH_PICO_COLLECTIONS_FIFO_MT_H
24 changes: 23 additions & 1 deletion src/collections/fifo_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,32 @@ int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move)
element_move(dst, src);
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_fifo_pull(&f->_fifo);
if (src) {
if (src != NULL) {
element_move(dst, src);
}
#endif // Z_FEATURE_MULTI_THREAD == 1

return _Z_RES_OK;
}

int8_t _z_fifo_mt_try_pull(void *dst, void *context, z_element_move_f element_move) {
_z_fifo_mt_t *f = (_z_fifo_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
void *src = NULL;
_Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex))
src = _z_fifo_pull(&f->_fifo);
if (src != NULL) {
_Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_full))
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex))
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_fifo_pull(&f->_fifo);
#endif // Z_FEATURE_MULTI_THREAD == 1

if (src != NULL) {
element_move(dst, src);
}

return _Z_RES_OK;
}
2 changes: 1 addition & 1 deletion src/collections/ring_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move)
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
#endif

if (src) {
if (src != NULL) {
element_move(dst, src);
}
return _Z_RES_OK;
Expand Down
48 changes: 42 additions & 6 deletions tests/z_channels_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
z_call(channel.send, &sample); \
} while (0);

#define RECV(channel, buf) \
#define _RECV(channel, method, buf) \
do { \
z_owned_sample_t sample = z_sample_null(); \
z_call(channel.recv, &sample); \
z_call(channel.method, &sample); \
if (z_check(sample)) { \
strncpy(buf, (const char *)z_loan(sample).payload.start, (size_t)z_loan(sample).payload.len); \
buf[z_loan(sample).payload.len] = '\0'; \
Expand All @@ -43,6 +43,9 @@
} \
} while (0);

#define RECV(channel, buf) _RECV(channel, recv, buf)
#define TRY_RECV(channel, buf) _RECV(channel, try_recv, buf)

void sample_fifo_channel_test(void) {
z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(10);

Expand All @@ -65,16 +68,46 @@ void sample_fifo_channel_test(void) {
z_drop(z_move(channel));
}

void sample_ring_channel_test_in_size(void) {
z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(10);
void sample_fifo_channel_test_try_recv(void) {
z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(10);

char buf[100];

TRY_RECV(channel, buf)
assert(strcmp(buf, "") == 0);

SEND(channel, "v1")
SEND(channel, "v22")
SEND(channel, "v333")
SEND(channel, "v4444")

TRY_RECV(channel, buf)
assert(strcmp(buf, "v1") == 0);
TRY_RECV(channel, buf)
assert(strcmp(buf, "v22") == 0);
TRY_RECV(channel, buf)
assert(strcmp(buf, "v333") == 0);
TRY_RECV(channel, buf)
assert(strcmp(buf, "v4444") == 0);
TRY_RECV(channel, buf)
assert(strcmp(buf, "") == 0);

z_drop(z_move(channel));
}

void sample_ring_channel_test_in_size(void) {
z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(10);

char buf[100];

RECV(channel, buf)
assert(strcmp(buf, "") == 0);

SEND(channel, "v1")
SEND(channel, "v22")
SEND(channel, "v333")
SEND(channel, "v4444")

RECV(channel, buf)
assert(strcmp(buf, "v1") == 0);
RECV(channel, buf)
Expand All @@ -92,13 +125,15 @@ void sample_ring_channel_test_in_size(void) {
void sample_ring_channel_test_over_size(void) {
z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(3);

char buf[100];
RECV(channel, buf)
assert(strcmp(buf, "") == 0);

SEND(channel, "v1")
SEND(channel, "v22")
SEND(channel, "v333")
SEND(channel, "v4444")

char buf[100];

RECV(channel, buf)
assert(strcmp(buf, "v22") == 0);
RECV(channel, buf)
Expand All @@ -113,6 +148,7 @@ void sample_ring_channel_test_over_size(void) {

int main(void) {
sample_fifo_channel_test();
sample_fifo_channel_test_try_recv();
sample_ring_channel_test_in_size();
sample_ring_channel_test_over_size();
}

0 comments on commit 8ed3609

Please sign in to comment.