Skip to content

Commit

Permalink
Apply review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 5, 2024
1 parent afb24bf commit 6b4876d
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 128 deletions.
2 changes: 1 addition & 1 deletion examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ int main(int argc, char **argv) {
}

printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_ring_channel_t channel = z_sample_ring_channel(size);
z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(size);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/c11/z_sub_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ int main(int argc, char **argv) {
}

printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel(3);
z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(3);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
Expand Down
18 changes: 9 additions & 9 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
if (internal_elem == NULL) { \
return; \
} \
int8_t res = collection_push_f(internal_elem, context, _z_##name##_elem_free); \
if (res) { \
_Z_ERROR("%s failed: %i", #collection_push_f, res); \
int8_t ret = collection_push_f(internal_elem, context, _z_##name##_elem_free); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_push_f, ret); \
} \
} \
static inline void _z_##name##_recv(recv_type *elem, void *context) { \
int8_t res = collection_pull_f(elem, context, _z_##name##_elem_move); \
if (res) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, res); \
int8_t ret = collection_pull_f(elem, context, _z_##name##_elem_move); \
if (ret != _Z_RES_OK) { \
_Z_ERROR("%s failed: %i", #collection_pull_f, ret); \
} \
} \
\
static inline z_owned_##name##_t z_##name(size_t capacity) { \
static inline z_owned_##name##_t z_##name##_new(size_t capacity) { \
z_owned_##name##_t channel; \
channel.collection = collection_new_f(capacity); \
channel.send = z_##send_closure_name(_z_##name##_send, NULL, channel.collection); \
Expand All @@ -77,12 +77,12 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t,
_z_ring_mt, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_owned_sample_move,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_owned_sample_move,
_z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t,
_z_fifo_mt, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_owned_sample_move,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_owned_sample_move,
_z_sample_to_owned_ptr, z_sample_drop)

#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
2 changes: 1 addition & 1 deletion include/zenoh-pico/collections/fifo_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ typedef struct {
} _z_fifo_mt_t;

int8_t _z_fifo_mti_init(size_t capacity);
_z_fifo_mt_t *_z_fifo_mt(size_t capacity);
_z_fifo_mt_t *_z_fifo_mt_new(size_t capacity);

void _z_fifo_mt_clear(_z_fifo_mt_t *fifo, z_element_free_f free_f);
void _z_fifo_mt_free(_z_fifo_mt_t *fifo, z_element_free_f free_f);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/collections/ring_mt.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ typedef struct {
} _z_ring_mt_t;

int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity);
_z_ring_mt_t *_z_ring_mt(size_t capacity);
_z_ring_mt_t *_z_ring_mt_new(size_t capacity);

void _z_ring_mt_clear(_z_ring_mt_t *ring, z_element_free_f free_f);
void _z_ring_mt_free(_z_ring_mt_t *ring, z_element_free_f free_f);
Expand Down
12 changes: 6 additions & 6 deletions src/api/api.c
Original file line number Diff line number Diff line change
Expand Up @@ -312,37 +312,37 @@ _Bool z_value_is_initialized(z_value_t *value) {
}

void z_closure_sample_call(const z_owned_closure_sample_t *closure, const z_sample_t *sample) {
if (closure->call) {
if (closure->call != NULL) {
(closure->call)(sample, closure->context);
}
}

void z_closure_owned_sample_call(const z_owned_closure_owned_sample_t *closure, z_owned_sample_t *sample) {
if (closure->call) {
if (closure->call != NULL) {
(closure->call)(sample, closure->context);
}
}

void z_closure_query_call(const z_owned_closure_query_t *closure, const z_query_t *query) {
if (closure->call) {
if (closure->call != NULL) {
(closure->call)(query, closure->context);
}
}

void z_closure_reply_call(const z_owned_closure_reply_t *closure, z_owned_reply_t *reply) {
if (closure->call) {
if (closure->call != NULL) {
(closure->call)(reply, closure->context);
}
}

void z_closure_hello_call(const z_owned_closure_hello_t *closure, z_owned_hello_t *hello) {
if (closure->call) {
if (closure->call != NULL) {
(closure->call)(hello, closure->context);
}
}

void z_closure_zid_call(const z_owned_closure_zid_t *closure, const z_id_t *id) {
if (closure->call) {
if (closure->call != NULL) {
(closure->call)(id, closure->context);
}
}
Expand Down
7 changes: 6 additions & 1 deletion src/api/handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src) {

z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) {
z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t));
if (dst && src) {
if (dst == NULL) {
return NULL;
}
if (src != NULL) {
dst->_value = (_z_sample_t *)zp_malloc(sizeof(_z_sample_t));
_z_sample_copy(dst->_value, src);
} else {
dst->_value = NULL;
}
return dst;
}
69 changes: 17 additions & 52 deletions src/collections/fifo_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,44 +14,33 @@

#include "zenoh-pico/collections/fifo_mt.h"

#include "zenoh-pico/protocol/codec/core.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/utils/logging.h"

/*-------- Fifo Buffer Multithreaded --------*/
int8_t _z_fifo_mt_init(_z_fifo_mt_t *fifo, size_t capacity) {
int8_t res = _z_fifo_init(&fifo->_fifo, capacity);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(_z_fifo_init(&fifo->_fifo, capacity))

#if Z_FEATURE_MULTI_THREAD == 1
res = zp_mutex_init(&fifo->_mutex);
if (res) {
return res;
}
res = zp_condvar_init(&fifo->_cv_not_full);
if (res) {
return res;
}
res = zp_condvar_init(&fifo->_cv_not_empty);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_init(&fifo->_mutex))
_Z_RETURN_IF_ERR(zp_condvar_init(&fifo->_cv_not_full))
_Z_RETURN_IF_ERR(zp_condvar_init(&fifo->_cv_not_empty))
#endif

return _Z_RES_OK;
}

_z_fifo_mt_t *_z_fifo_mt(size_t capacity) {
_z_fifo_mt_t *_z_fifo_mt_new(size_t capacity) {
_z_fifo_mt_t *fifo = (_z_fifo_mt_t *)zp_malloc(sizeof(_z_fifo_mt_t));
if (fifo == NULL) {
_Z_ERROR("zp_malloc failed");
return NULL;
}

int8_t res = _z_fifo_mt_init(fifo, capacity);
if (res) {
_Z_ERROR("_z_fifo_mt_init failed: %i", res);
int8_t ret = _z_fifo_mt_init(fifo, capacity);
if (ret != _Z_RES_OK) {
_Z_ERROR("_z_fifo_mt_init failed: %i", ret);
zp_free(fifo);
return NULL;
}
Expand Down Expand Up @@ -82,28 +71,16 @@ int8_t _z_fifo_mt_push(const void *elem, void *context, z_element_free_f element
_z_fifo_mt_t *f = (_z_fifo_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
int res = zp_mutex_lock(&f->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex))
while (elem != NULL) {
elem = _z_fifo_push(&f->_fifo, (void *)elem);
if (elem != NULL) {
res = zp_condvar_wait(&f->_cv_not_full, &f->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_condvar_wait(&f->_cv_not_full, &f->_mutex))
} else {
res = zp_condvar_signal(&f->_cv_not_empty);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_empty))
}
}
res = zp_mutex_unlock(&f->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex))
#else // Z_FEATURE_MULTI_THREAD == 1
_z_fifo_push_drop(&f->_fifo, elem, element_free);
#endif // Z_FEATURE_MULTI_THREAD == 1
Expand All @@ -116,28 +93,16 @@ int8_t _z_fifo_mt_pull(void *dst, void *context, z_element_move_f element_move)

#if Z_FEATURE_MULTI_THREAD == 1
void *src = NULL;
int res = zp_mutex_lock(&f->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_lock(&f->_mutex))
while (src == NULL) {
src = _z_fifo_pull(&f->_fifo);
if (src == NULL) {
res = zp_condvar_wait(&f->_cv_not_empty, &f->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_condvar_wait(&f->_cv_not_empty, &f->_mutex))
} else {
res = zp_condvar_signal(&f->_cv_not_full);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_condvar_signal(&f->_cv_not_full))
}
}
res = zp_mutex_unlock(&f->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&f->_mutex))
element_move(dst, src);
#else // Z_FEATURE_MULTI_THREAD == 1
void *src = _z_fifo_pull(&f->_fifo);
Expand Down
39 changes: 11 additions & 28 deletions src/collections/ring_mt.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,30 @@

#include "zenoh-pico/collections/ring_mt.h"

#include "zenoh-pico/protocol/codec/core.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/utils/logging.h"

/*-------- Ring Buffer Multithreaded --------*/
int8_t _z_ring_mt_init(_z_ring_mt_t *ring, size_t capacity) {
int8_t res = _z_ring_init(&ring->_ring, capacity);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(_z_ring_init(&ring->_ring, capacity))

#if Z_FEATURE_MULTI_THREAD == 1
res = zp_mutex_init(&ring->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_init(&ring->_mutex))
#endif
return _Z_RES_OK;
}

_z_ring_mt_t *_z_ring_mt(size_t capacity) {
_z_ring_mt_t *_z_ring_mt_new(size_t capacity) {
_z_ring_mt_t *ring = (_z_ring_mt_t *)zp_malloc(sizeof(_z_ring_mt_t));
if (ring == NULL) {
_Z_ERROR("zp_malloc failed");
return NULL;
}

int8_t res = _z_ring_mt_init(ring, capacity);
if (res) {
_Z_ERROR("_z_ring_mt_init failed: %i", res);
int8_t ret = _z_ring_mt_init(ring, capacity);
if (ret != _Z_RES_OK) {
_Z_ERROR("_z_ring_mt_init failed: %i", ret);
return NULL;
}

Expand Down Expand Up @@ -71,19 +66,13 @@ int8_t _z_ring_mt_push(const void *elem, void *context, z_element_free_f element
_z_ring_mt_t *r = (_z_ring_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
int8_t res = zp_mutex_lock(&r->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex))
#endif

_z_ring_push_force_drop(&r->_ring, (void *)elem, element_free);

#if Z_FEATURE_MULTI_THREAD == 1
res = zp_mutex_unlock(&r->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
#endif
return _Z_RES_OK;
}
Expand All @@ -92,19 +81,13 @@ int8_t _z_ring_mt_pull(void *dst, void *context, z_element_move_f element_move)
_z_ring_mt_t *r = (_z_ring_mt_t *)context;

#if Z_FEATURE_MULTI_THREAD == 1
int res = zp_mutex_lock(&r->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_lock(&r->_mutex))
#endif

void *src = _z_ring_pull(&r->_ring);

#if Z_FEATURE_MULTI_THREAD == 1
res = zp_mutex_unlock(&r->_mutex);
if (res) {
return res;
}
_Z_RETURN_IF_ERR(zp_mutex_unlock(&r->_mutex))
#endif

if (src) {
Expand Down
Loading

0 comments on commit 6b4876d

Please sign in to comment.