Skip to content

Commit

Permalink
Add try_pull for ring_mt
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 8, 2024
1 parent 63cc4b7 commit e729237
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
2 changes: 1 addition & 1 deletion include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_pull,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/collections/ring_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ typedef struct {
_z_ring_t _ring;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_t _mutex;
zp_condvar_t _cv_not_empty;
#endif
} _z_ring_mt_t;

Expand All @@ -37,5 +38,6 @@ void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f);
int8_t _z_ring_mt_push(const void *src, void *context, z_element_free_f element_free);

int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move);
int8_t _z_ring_mt_try_pull(void *dst, void *context, z_element_move_f element_move);

#endif // ZENOH_PICO_COLLECTIONS_RING_MT_H
27 changes: 27 additions & 0 deletions src/collections/ring_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity) {

#if Z_FEATURE_MULTI_THREAD == 1
_Z_RETURN_IF_ERR(zp_mutex_init(&ring->_mutex))
_Z_RETURN_IF_ERR(zp_condvar_init(&ring->_cv_not_empty))
#endif
return _Z_RES_OK;
}
Expand All @@ -47,6 +48,7 @@ _z_ring_mt_t *_z_ring_mt_new(size_t capacity) {
void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f) {
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_free(&ring->_mutex);
zp_condvar_free(&ring->_cv_not_empty);
#endif

_z_ring_clear(&ring->_ring, free_f);
Expand All @@ -72,6 +74,7 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element
_z_ring_push_force_drop(&r->_ring, (void *)elem, element_free);

#if Z_FEATURE_MULTI_THREAD == 1
_Z_RETURN_IF_ERR(zp_condvar_signal(&r->_cv_not_empty))
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
#endif
return _Z_RES_OK;
Expand All @@ -80,6 +83,30 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element
int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move) {
_z_ring_mt_t *r = (_z_ring_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
void *src = NULL;
_Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex))
while (src == NULL) {
src = _z_ring_pull(&r->_ring);
if (src == NULL) {
_Z_RETURN_IF_ERR(zp_condvar_wait(&r->_cv_not_empty, &r->_mutex))
}
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
element_move(dst, src);
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_ring_pull(&r->_ring);
if (src != NULL) {
element_move(dst, src);
}
#endif // Z_FEATURE_MULTI_THREAD == 1

return _Z_RES_OK;
}

int8_t _z_ring_mt_try_pull(void *dst, void *context, z_element_move_f element_move) {
_z_ring_mt_t *r = (_z_ring_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
_Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex))
#endif
Expand Down
8 changes: 4 additions & 4 deletions tests/z_channels_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void sample_ring_channel_test_in_size(void) {

char buf[100];

RECV(channel, buf)
TRY_RECV(channel, buf)
assert(strcmp(buf, "") == 0);

SEND(channel, "v1")
Expand All @@ -116,7 +116,7 @@ void sample_ring_channel_test_in_size(void) {
assert(strcmp(buf, "v333") == 0);
RECV(channel, buf)
assert(strcmp(buf, "v4444") == 0);
RECV(channel, buf)
TRY_RECV(channel, buf)
assert(strcmp(buf, "") == 0);

z_drop(z_move(channel));
Expand All @@ -126,7 +126,7 @@ void sample_ring_channel_test_over_size(void) {
z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(3);

char buf[100];
RECV(channel, buf)
TRY_RECV(channel, buf)
assert(strcmp(buf, "") == 0);

SEND(channel, "v1")
Expand All @@ -140,7 +140,7 @@ void sample_ring_channel_test_over_size(void) {
assert(strcmp(buf, "v333") == 0);
RECV(channel, buf)
assert(strcmp(buf, "v4444") == 0);
RECV(channel, buf)
TRY_RECV(channel, buf)
assert(strcmp(buf, "") == 0);

z_drop(z_move(channel));
Expand Down

0 comments on commit e729237

Please sign in to comment.