Skip to content

Commit

Permalink
Add owned/loaned for task/mutex/condvar
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jul 2, 2024
1 parent 4411046 commit bf5cdd4
Show file tree
Hide file tree
Showing 61 changed files with 525 additions and 413 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ file(GLOB_RECURSE Sources
"src/session/*.c"
"src/transport/*.c"
"src/utils/*.c"
"src/system/platform-common.c"
)

if(WITH_ZEPHYR)
Expand Down
14 changes: 7 additions & 7 deletions examples/unix/c11/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1
static z_condvar_t cond;
static z_mutex_t mutex;
static z_owned_condvar_t cond;
static z_owned_mutex_t mutex;

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
z_condvar_signal(&cond);
z_condvar_free(&cond);
z_condvar_signal(z_loan_mut(cond));
z_drop(z_move(cond));
}

void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
Expand Down Expand Up @@ -116,7 +116,7 @@ int main(int argc, char **argv) {
return -1;
}

z_mutex_lock(&mutex);
z_mutex_lock(z_loan_mut(mutex));
printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts;
z_get_options_default(&opts);
Expand All @@ -133,8 +133,8 @@ int main(int argc, char **argv) {
printf("Unable to send query.\n");
return -1;
}
z_condvar_wait(&cond, &mutex);
z_mutex_unlock(&mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
z_mutex_unlock(z_loan_mut(mutex));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan_mut(s));
Expand Down
14 changes: 7 additions & 7 deletions examples/unix/c11/z_get_attachment.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ typedef struct kv_pairs_rx_t {
#define KVP_LEN 16

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1
static z_condvar_t cond;
static z_mutex_t mutex;
static z_owned_condvar_t cond;
static z_owned_mutex_t mutex;

_Bool create_attachment_iter(z_owned_bytes_t *kv_pair, void *context) {
kv_pairs_tx_t *kvs = (kv_pairs_tx_t *)(context);
Expand Down Expand Up @@ -93,8 +93,8 @@ void drop_attachment(kv_pairs_rx_t *kvp) {
void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
z_condvar_signal(&cond);
z_condvar_free(&cond);
z_condvar_signal(z_loan_mut(cond));
z_drop(z_move(cond));
}

void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
Expand Down Expand Up @@ -194,7 +194,7 @@ int main(int argc, char **argv) {
return -1;
}

z_mutex_lock(&mutex);
z_mutex_lock(z_loan_mut(mutex));
printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts;
z_get_options_default(&opts);
Expand All @@ -219,8 +219,8 @@ int main(int argc, char **argv) {
printf("Unable to send query.\n");
return -1;
}
z_condvar_wait(&cond, &mutex);
z_mutex_unlock(&mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
z_mutex_unlock(z_loan_mut(mutex));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan_mut(s));
Expand Down
16 changes: 8 additions & 8 deletions examples/unix/c11/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
#define DEFAULT_PING_NB 100
#define DEFAULT_WARMUP_MS 1000

static z_condvar_t cond;
static z_mutex_t mutex;
static z_owned_condvar_t cond;
static z_owned_mutex_t mutex;

void callback(const z_loaned_sample_t* sample, void* context) {
(void)sample;
(void)context;
z_condvar_signal(&cond);
z_condvar_signal(z_loan_mut(cond));
}
void drop(void* context) {
(void)context;
z_condvar_free(&cond);
z_drop(z_move(cond));
}

struct args_t {
Expand Down Expand Up @@ -100,7 +100,7 @@ int main(int argc, char** argv) {
for (unsigned int i = 0; i < args.size; i++) {
data[i] = (uint8_t)(i % 10);
}
z_mutex_lock(&mutex);
z_mutex_lock(z_loan_mut(mutex));
if (args.warmup_ms) {
printf("Warming up for %dms...\n", args.warmup_ms);
z_clock_t warmup_start = z_clock_now();
Expand All @@ -111,7 +111,7 @@ int main(int argc, char** argv) {
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
Expand All @@ -123,13 +123,13 @@ int main(int argc, char** argv) {
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
results[i] = z_clock_elapsed_us(&measure_start);
}
for (unsigned int i = 0; i < args.number_of_pings; i++) {
printf("%d bytes: seq=%d rtt=%luµs, lat=%luµs\n", args.size, i, results[i], results[i] / 2);
}
z_mutex_unlock(&mutex);
z_mutex_unlock(z_loan_mut(mutex));
z_free(results);
z_free(data);
z_drop(z_move(pub));
Expand Down
14 changes: 7 additions & 7 deletions examples/unix/c99/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1
z_condvar_t cond;
z_mutex_t mutex;
z_owned_condvar_t cond;
z_owned_mutex_t mutex;

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
z_condvar_signal(&cond);
z_condvar_free(&cond);
z_condvar_signal(z_condvar_loan_mut(&cond));
z_condvar_drop(z_condvar_move(&cond));
}

void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
Expand Down Expand Up @@ -117,7 +117,7 @@ int main(int argc, char **argv) {
return -1;
}

z_mutex_lock(&mutex);
z_mutex_lock(z_mutex_loan_mut(&mutex));
printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts;
z_get_options_default(&opts);
Expand All @@ -133,8 +133,8 @@ int main(int argc, char **argv) {
printf("Unable to send query.\n");
return -1;
}
z_condvar_wait(&cond, &mutex);
z_mutex_unlock(&mutex);
z_condvar_wait(z_condvar_loan_mut(&cond), z_mutex_loan_mut(&mutex));
z_mutex_unlock(z_mutex_loan_mut(&mutex));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_session_loan_mut(&s));
Expand Down
16 changes: 8 additions & 8 deletions examples/unix/c99/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@
#define DEFAULT_PING_NB 100
#define DEFAULT_WARMUP_MS 1000

static z_condvar_t cond;
static z_mutex_t mutex;
static z_owned_condvar_t cond;
static z_owned_mutex_t mutex;

void callback(const z_loaned_sample_t* sample, void* context) {
(void)sample;
(void)context;
z_condvar_signal(&cond);
z_condvar_signal(z_condvar_loan_mut(&cond));
}
void drop(void* context) {
(void)context;
z_condvar_free(&cond);
z_condvar_drop(z_condvar_move(&cond));
}

struct args_t {
Expand Down Expand Up @@ -103,7 +103,7 @@ int main(int argc, char** argv) {
for (unsigned int i = 0; i < args.size; i++) {
data[i] = (uint8_t)(i % 10);
}
z_mutex_lock(&mutex);
z_mutex_lock(z_mutex_loan_mut(&mutex));
if (args.warmup_ms) {
printf("Warming up for %dms...\n", args.warmup_ms);
z_clock_t warmup_start = z_clock_now();
Expand All @@ -114,7 +114,7 @@ int main(int argc, char** argv) {
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL);
z_condvar_wait(&cond, &mutex);
z_condvar_wait(z_condvar_loan_mut(&cond), z_mutex_loan_mut(&mutex));
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
Expand All @@ -127,13 +127,13 @@ int main(int argc, char** argv) {
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL);
z_condvar_wait(&cond, &mutex);
z_condvar_wait(z_condvar_loan_mut(&cond), z_mutex_loan_mut(&mutex));
results[i] = z_clock_elapsed_us(&measure_start);
}
for (unsigned int i = 0; i < args.number_of_pings; i++) {
printf("%d bytes: seq=%d rtt=%luµs, lat=%luµs\n", args.size, i, results[i], results[i] / 2);
}
z_mutex_unlock(&mutex);
z_mutex_unlock(z_mutex_loan_mut(&mutex));
z_free(results);
z_free(data);
z_undeclare_subscriber(z_subscriber_move(&sub));
Expand Down
14 changes: 7 additions & 7 deletions examples/windows/z_get.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
#include <zenoh-pico.h>

#if Z_FEATURE_QUERY == 1 && Z_FEATURE_MULTI_THREAD == 1
z_condvar_t cond;
z_mutex_t mutex;
z_owned_condvar_t cond;
z_owned_mutex_t mutex;

void reply_dropper(void *ctx) {
(void)(ctx);
printf(">> Received query final notification\n");
z_condvar_signal(&cond);
z_condvar_free(&cond);
z_condvar_signal(z_loan_mut(cond));
z_drop(z_move(cond));
}

void reply_handler(const z_loaned_reply_t *reply, void *ctx) {
Expand Down Expand Up @@ -81,7 +81,7 @@ int main(int argc, char **argv) {
return -1;
}

z_mutex_lock(&mutex);
z_mutex_lock(z_loan_mut(mutex));
printf("Sending Query '%s'...\n", keyexpr);
z_get_options_t opts;
z_get_options_default(&opts);
Expand All @@ -97,8 +97,8 @@ int main(int argc, char **argv) {
printf("Unable to send query.\n");
return -1;
}
z_condvar_wait(&cond, &mutex);
z_mutex_unlock(&mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
z_mutex_unlock(z_loan_mut(mutex));

// Stop read and lease tasks for zenoh-pico
zp_stop_read_task(z_loan_mut(s));
Expand Down
16 changes: 8 additions & 8 deletions examples/windows/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@
#define DEFAULT_PING_NB 100
#define DEFAULT_WARMUP_MS 1000

static z_condvar_t cond;
static z_mutex_t mutex;
static z_owned_condvar_t cond;
static z_owned_mutex_t mutex;

void callback(const z_loaned_sample_t* sample, void* context) {
(void)sample;
(void)context;
z_condvar_signal(&cond);
z_condvar_signal(z_loan_mut(cond));
}
void drop(void* context) {
(void)context;
z_condvar_free(&cond);
z_drop(z_move(cond));
}

struct args_t {
Expand Down Expand Up @@ -99,7 +99,7 @@ int main(int argc, char** argv) {
for (unsigned int i = 0; i < args.size; i++) {
data[i] = i % 10;
}
z_mutex_lock(&mutex);
z_mutex_lock(z_loan_mut(mutex));
if (args.warmup_ms) {
printf("Warming up for %dms...\n", args.warmup_ms);
z_clock_t warmup_start = z_clock_now();
Expand All @@ -110,7 +110,7 @@ int main(int argc, char** argv) {
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
elapsed_us = z_clock_elapsed_us(&warmup_start);
}
}
Expand All @@ -123,13 +123,13 @@ int main(int argc, char** argv) {
z_bytes_serialize_from_slice(&payload, data, args.size);

z_publisher_put(z_loan(pub), z_move(payload), NULL);
z_condvar_wait(&cond, &mutex);
z_condvar_wait(z_loan_mut(cond), z_loan_mut(mutex));
results[i] = z_clock_elapsed_us(&measure_start);
}
for (unsigned int i = 0; i < args.number_of_pings; i++) {
printf("%d bytes: seq=%d rtt=%luus, lat=%luus\n", args.size, i, results[i], results[i] / 2);
}
z_mutex_unlock(&mutex);
z_mutex_unlock(z_loan_mut(mutex));
z_free(results);
z_free(data);
z_drop(z_move(pub));
Expand Down
Loading

0 comments on commit bf5cdd4

Please sign in to comment.