Skip to content

Commit

Permalink
Add reply channel and update z_get
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 9, 2024
1 parent c613907 commit 90bba75
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 36 deletions.
44 changes: 15 additions & 29 deletions examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,6 @@
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1
static z_condvar_t cond;
static z_mutex_t mutex;

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
z_condvar_signal(&cond);
z_condvar_free(&cond);
}

void reply_handler(z_owned_reply_t *reply, void *ctx) {
(void)(ctx);
if (z_reply_is_ok(reply)) {
z_sample_t sample = z_reply_ok(reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
}
}

int main(int argc, char **argv) {
const char *keyexpr = "demo/example/**";
Expand Down Expand Up @@ -78,9 +57,6 @@ int main(int argc, char **argv) {
}
}

z_mutex_init(&mutex);
z_condvar_init(&cond);

z_owned_config_t config = z_config_default();
zp_config_insert(z_loan(config), Z_CONFIG_MODE_KEY, z_string_make(mode));
if (clocator != NULL) {
Expand Down Expand Up @@ -110,19 +86,29 @@ int main(int argc, char **argv) {
return -1;
}

z_mutex_lock(&mutex);
printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts = z_get_options_default();
if (value != NULL) {
opts.value.payload = _z_bytes_wrap((const uint8_t *)value, strlen(value));
}
z_owned_closure_reply_t callback = z_closure(reply_handler, reply_dropper);
if (z_get(z_loan(s), ke, "", z_move(callback), &opts) < 0) {
z_owned_reply_ring_channel_t channel = z_reply_ring_channel_new(1);
if (z_get(z_loan(s), ke, "", z_move(channel.send), &opts) < 0) {
printf("Unable to send query.\n");
return -1;
}
z_condvar_wait(&cond, &mutex);
z_mutex_unlock(&mutex);

z_owned_reply_t reply = z_reply_null();
z_call(channel.recv, &reply);
if (z_reply_is_ok(&reply)) {
z_sample_t sample = z_reply_ok(&reply);
z_owned_str_t keystr = z_keyexpr_to_string(sample.keyexpr);
printf(">> Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len, sample.payload.start);
z_drop(z_move(keystr));
} else {
printf(">> Received an error\n");
}

z_drop(z_move(channel));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
Expand Down
24 changes: 19 additions & 5 deletions include/zenoh-pico/api/handlers.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
void _z_owned_sample_move(z_owned_sample_t *dst, const z_owned_sample_t *src);
z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);

// -- Reply handler
void _z_owned_reply_move(z_owned_reply_t *dst, const z_owned_reply_t *src);
z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src);

// -- Channel
#define _Z_CHANNEL_DEFINE(name, send_closure_name, recv_closure_name, send_type, recv_type, collection_type, \
collection_new_f, collection_free_f, collection_push_f, collection_pull_f, \
Expand All @@ -45,7 +49,7 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
static inline void _z_##name##_elem_move(void *dst, const void *src) { \
elem_move_f((recv_type *)dst, (const recv_type *)src); \
} \
static inline void _z_##name##_send(const send_type *elem, void *context) { \
static inline void _z_##name##_send(send_type *elem, void *context) { \
void *internal_elem = elem_convert_f(elem); \
if (internal_elem == NULL) { \
return; \
Expand Down Expand Up @@ -84,13 +88,23 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src);
}

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_Z_CHANNEL_DEFINE(sample_ring_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t,
_z_ring_mt_t, _z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_fifo_channel_t
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, z_sample_t, z_owned_sample_t, _z_fifo_mt_t,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_Z_CHANNEL_DEFINE(sample_fifo_channel, closure_sample, closure_owned_sample, const z_sample_t, z_owned_sample_t,
_z_fifo_mt_t, _z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_sample_move, _z_sample_to_owned_ptr, z_sample_drop)

// z_owned_sample_ring_channel_t
_Z_CHANNEL_DEFINE(reply_ring_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_ring_mt_t,
_z_ring_mt_new, _z_ring_mt_free, _z_ring_mt_push, _z_ring_mt_pull, _z_ring_mt_try_pull,
_z_owned_reply_move, _z_reply_clone, z_reply_drop)

// z_owned_reply_fifo_channel_t
_Z_CHANNEL_DEFINE(reply_fifo_channel, closure_reply, closure_reply, z_owned_reply_t, z_owned_reply_t, _z_fifo_mt_t,
_z_fifo_mt_new, _z_fifo_mt_free, _z_fifo_mt_push, _z_fifo_mt_pull, _z_fifo_mt_try_pull,
_z_owned_reply_move, _z_reply_clone, z_reply_drop)

#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
8 changes: 6 additions & 2 deletions include/zenoh-pico/api/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
z_owned_closure_hello_t * : z_closure_hello_drop, \
z_owned_closure_zid_t * : z_closure_zid_drop, \
z_owned_sample_ring_channel_t * : z_sample_ring_channel_drop, \
z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop \
z_owned_sample_fifo_channel_t * : z_sample_fifo_channel_drop, \
z_owned_reply_ring_channel_t * : z_reply_ring_channel_drop, \
z_owned_reply_fifo_channel_t * : z_reply_fifo_channel_drop \
)(x)

/**
Expand Down Expand Up @@ -146,7 +148,9 @@
z_owned_closure_zid_t : z_closure_zid_move, \
z_owned_sample_t : z_sample_move, \
z_owned_sample_ring_channel_t : z_sample_ring_channel_move, \
z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move \
z_owned_sample_fifo_channel_t : z_sample_fifo_channel_move, \
z_owned_reply_ring_channel_t : z_reply_ring_channel_move, \
z_owned_reply_fifo_channel_t : z_reply_ring_channel_move \
)(&x)

/**
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/session/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ typedef struct {
} _z_reply_data_t;

void _z_reply_data_clear(_z_reply_data_t *rd);
void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src);

_Z_ELEM_DEFINE(_z_reply_data, _z_reply_data_t, _z_noop_size, _z_reply_data_clear, _z_noop_copy)
_Z_LIST_DEFINE(_z_reply_data, _z_reply_data_t)
Expand All @@ -67,6 +68,7 @@ typedef struct {
} _z_reply_t;
void _z_reply_clear(_z_reply_t *src);
void _z_reply_free(_z_reply_t **hello);
void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src);

typedef struct {
_z_keyexpr_t _key;
Expand Down
19 changes: 19 additions & 0 deletions src/api/handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,22 @@ z_owned_sample_t *_z_sample_to_owned_ptr(const _z_sample_t *src) {
}
return dst;
}

// -- Reply
void _z_owned_reply_move(z_owned_reply_t *dst, const z_owned_reply_t *src) {
memcpy(dst, src, sizeof(z_owned_sample_t));
}

z_owned_reply_t *_z_reply_clone(const z_owned_reply_t *src) {
z_owned_reply_t *dst = (z_owned_reply_t *)zp_malloc(sizeof(z_owned_reply_t));
if (dst == NULL) {
return NULL;
}
if (src != NULL && src->_value) {
dst->_value = (_z_reply_t *)zp_malloc(sizeof(_z_reply_t));
_z_reply_copy(dst->_value, src->_value);
} else {
dst->_value = NULL;
}
return dst;
}
5 changes: 5 additions & 0 deletions src/net/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ void _z_reply_data_free(_z_reply_data_t **reply_data) {
}
}

void _z_reply_data_copy(_z_reply_data_t *dst, _z_reply_data_t *src) {
_z_sample_copy(&dst->sample, &src->sample);
dst->replier_id = src->replier_id;
}

void _z_value_clear(_z_value_t *value) {
_z_bytes_clear(&value->encoding.suffix);
_z_bytes_clear(&value->payload);
Expand Down
5 changes: 5 additions & 0 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ void _z_reply_free(_z_reply_t **reply) {
}
}

void _z_reply_copy(_z_reply_t *dst, _z_reply_t *src) {
_z_reply_data_copy(&dst->data, &src->data);
dst->_tag = src->_tag;
}

_Bool _z_pending_reply_eq(const _z_pending_reply_t *one, const _z_pending_reply_t *two) {
return one->_tstamp.time == two->_tstamp.time;
}
Expand Down

0 comments on commit 90bba75

Please sign in to comment.