Skip to content

Commit

Permalink
Moved utils to handlers. Added FIFO handlers. Add z_sub_channel.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 22, 2024
1 parent 2e13913 commit 25b0661
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 138 deletions.
5 changes: 4 additions & 1 deletion examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ if(UNIX)
add_example(z_pub unix/c99/z_pub.c)
add_example(z_pub_st unix/c99/z_pub_st.c)
add_example(z_sub unix/c99/z_sub.c)
add_example(z_sub_channel unix/c99/z_sub_channel.c)
add_example(z_sub_st unix/c99/z_sub_st.c)
add_example(z_pull unix/c99/z_pull.c)
add_example(z_get unix/c99/z_get.c)
Expand All @@ -37,6 +38,7 @@ if(UNIX)
add_example(z_pub unix/c11/z_pub.c)
add_example(z_pub_st unix/c11/z_pub_st.c)
add_example(z_sub unix/c11/z_sub.c)
add_example(z_sub_channel unix/c11/z_sub_channel.c)
add_example(z_sub_st unix/c11/z_sub_st.c)
add_example(z_pull unix/c11/z_pull.c)
add_example(z_get unix/c11/z_get.c)
Expand All @@ -53,8 +55,9 @@ elseif(MSVC)
add_example(z_pub windows/z_pub.c)
add_example(z_pub_st windows/z_pub_st.c)
add_example(z_sub windows/z_sub.c)
add_example(z_sub windows/z_sub_channel.c)
add_example(z_sub_st windows/z_sub_st.c)
add_example(z_pull windows/z_pull.c)
add_example(z_sub_channel windows/z_pull.c)
add_example(z_get windows/z_get.c)
add_example(z_queryable windows/z_queryable.c)
add_example(z_info windows/z_info.c)
Expand Down
99 changes: 99 additions & 0 deletions examples/unix/c11/z_sub_channel.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//
// Copyright (c) 2022 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>

#include <ctype.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <zenoh-pico.h>

#if Z_FEATURE_SUBSCRIPTION == 1
int main(int argc, char **argv) {
const char *keyexpr = "demo/example/**";
char *locator = NULL;

int opt;
while ((opt = getopt(argc, argv, "k:e:")) != -1) {
switch (opt) {
case 'k':
keyexpr = optarg;
break;
case 'e':
locator = optarg;
break;
case '?':
if (optopt == 'k' || optopt == 'e') {
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
} else {
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
return 1;
default:
return -1;
}
}

z_owned_config_t config = z_config_default();
if (locator != NULL) {
zp_config_insert(z_loan(config), Z_CONFIG_CONNECT_KEY, z_string_make(locator));
}

printf("Opening session...\n");
z_owned_session_t s = z_open(z_move(config));
if (!z_check(s)) {
printf("Unable to open session!\n");
return -1;
}

// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_loan(s), NULL) < 0 || zp_start_lease_task(z_loan(s), NULL) < 0) {
printf("Unable to start read and lease tasks\n");
z_close(z_session_move(&s));
return -1;
}

printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_channel_t channel = z_sample_channel_fifo_new(3);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
printf("Unable to declare subscriber.\n");
return -1;
}

z_owned_sample_t sample = z_sample_null();
for (z_call(channel.recv, &sample); z_check(sample); z_call(channel.recv, &sample)) {
z_owned_str_t keystr = z_keyexpr_to_string(z_loan(sample.keyexpr));
printf(">> [Subscriber] Received ('%s': '%.*s')\n", z_loan(keystr), (int)sample.payload.len,
sample.payload.start);
z_drop(z_move(keystr));
z_drop(z_move(sample));
sample = z_sample_null();
}

z_undeclare_subscriber(z_move(sub));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan(s));
zp_stop_lease_task(z_loan(s));

z_close(z_move(s));

return 0;
}
#else
int main(void) {
printf("ERROR: Zenoh pico was compiled without Z_FEATURE_SUBSCRIPTION but this example requires it.\n");
return -2;
}
#endif
2 changes: 1 addition & 1 deletion include/zenoh-pico.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#define ZENOH_PICO_TWEAK 0

#include "zenoh-pico/api/constants.h"
#include "zenoh-pico/api/handlers.h"
#include "zenoh-pico/api/macros.h"
#include "zenoh-pico/api/primitives.h"
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/api/utils.h"

#endif /* ZENOH_PICO_H */
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#ifndef INCLUDE_ZENOH_PICO_API_UTILS_H
#define INCLUDE_ZENOH_PICO_API_UTILS_H
#ifndef INCLUDE_ZENOH_PICO_API_HANDLERS_H
#define INCLUDE_ZENOH_PICO_API_HANDLERS_H

#include <stdint.h>
#include <stdio.h>

#include "zenoh-pico/api/macros.h"
#include "zenoh-pico/api/types.h"
#include "zenoh-pico/collections/element.h"
#include "zenoh-pico/collections/fifo.h"
#include "zenoh-pico/collections/ring.h"
#include "zenoh-pico/system/platform.h"

// -- Owned sample
// @TODO: define it via dedicated macros and move it to the appropriate place in types.
typedef struct {
z_owned_keyexpr_t keyexpr;
z_bytes_t payload;
Expand Down Expand Up @@ -68,6 +71,8 @@ static inline void z_sample_drop(z_owned_sample_t *s) {
}
}

_Z_ELEM_DEFINE(_z_owned_sample, z_owned_sample_t, _z_noop_size, z_sample_drop, _z_noop_copy)

// -- Channel
typedef int8_t (*_z_owned_sample_handler_t)(z_owned_sample_t *dst, void *context);

Expand All @@ -82,21 +87,10 @@ typedef struct {
z_owned_closure_owned_sample_t recv;
} z_owned_sample_channel_t;

static inline z_owned_sample_channel_t z_owned_sample_channel_null() {
z_owned_sample_channel_t ch = {.send = NULL, .recv = NULL};
return ch;
}

static inline int8_t z_closure_owned_sample_call(z_owned_closure_owned_sample_t *recv, z_owned_sample_t *dst) {
int8_t res = _Z_ERR_GENERIC;
if (recv != NULL) {
res = (recv->call)(dst, recv->context);
}
return res;
}
z_owned_sample_channel_t z_owned_sample_channel_null();
int8_t z_closure_owned_sample_call(z_owned_closure_owned_sample_t *recv, z_owned_sample_t *dst);

// -- Ring
_Z_ELEM_DEFINE(_z_owned_sample, z_owned_sample_t, _z_noop_size, _z_noop_clear, _z_noop_copy)
_Z_RING_DEFINE(_z_owned_sample, z_owned_sample_t)

typedef struct {
Expand All @@ -106,65 +100,24 @@ typedef struct {
#endif
} z_owned_sample_ring_t;

static inline void z_sample_channel_ring_push(const z_sample_t *src, void *context) {
if (src == NULL || context == NULL) {
return;
}

z_owned_sample_ring_t *r = (z_owned_sample_ring_t *)context;
z_owned_sample_t *dst = (z_owned_sample_t *)zp_malloc(sizeof(z_owned_sample_t));
if (dst == NULL) {
return;
}
*dst = z_sample_to_owned(src);

#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&r->_mutex);
#endif
_z_owned_sample_ring_push_force_drop(&r->_ring, dst);
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&r->_mutex);
#endif
}

static inline int8_t z_sample_channel_ring_pull(z_owned_sample_t *dst, void *context) {
int8_t ret = _Z_RES_OK;
z_owned_sample_channel_t z_sample_channel_ring_new(size_t capacity);
void z_sample_channel_ring_push(const z_sample_t *src, void *context);
int8_t z_sample_channel_ring_pull(z_owned_sample_t *dst, void *context);

z_owned_sample_ring_t *r = (z_owned_sample_ring_t *)context;
// -- Fifo
_Z_FIFO_DEFINE(_z_owned_sample, z_owned_sample_t)

typedef struct {
_z_owned_sample_fifo_t _fifo;
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_lock(&r->_mutex);
#endif
z_owned_sample_t *src = _z_owned_sample_ring_pull(&r->_ring);
#if Z_FEATURE_MULTI_THREAD == 1
zp_mutex_unlock(&r->_mutex);
zp_mutex_t _mutex;
zp_condvar_t _cv_not_full;
zp_condvar_t _cv_not_empty;
#endif
} z_owned_sample_fifo_t;

if (src == NULL) {
*dst = z_sample_null();
} else {
memcpy(dst, src, sizeof(z_owned_sample_t));
}
return ret;
}

static inline z_owned_sample_channel_t z_sample_channel_ring_new(size_t capacity) {
z_owned_sample_channel_t ch = z_owned_sample_channel_null();

z_owned_sample_ring_t *ring = (z_owned_sample_ring_t *)zp_malloc(sizeof(z_owned_sample_ring_t));
if (ring != NULL) {
int8_t res = _z_owned_sample_ring_init(&ring->_ring, capacity);
if (res == _Z_RES_OK) {
z_owned_closure_sample_t send = z_closure(z_sample_channel_ring_push, NULL, ring);
ch.send = send;
z_owned_closure_owned_sample_t recv = z_closure(z_sample_channel_ring_pull, NULL, ring);
ch.recv = recv;
} else {
zp_free(ring);
}
}

return ch;
}
z_owned_sample_channel_t z_sample_channel_fifo_new(size_t capacity);
void z_sample_channel_fifo_push(const z_sample_t *src, void *context);
int8_t z_sample_channel_fifo_pull(z_owned_sample_t *dst, void *context);

#endif // INCLUDE_ZENOH_PICO_API_UTILS_H
#endif // INCLUDE_ZENOH_PICO_API_HANDLERS_H
Loading

0 comments on commit 25b0661

Please sign in to comment.