Skip to content

Commit

Permalink
Pub/sub rework
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Apr 29, 2024
1 parent c0049a6 commit 56b9815
Show file tree
Hide file tree
Showing 41 changed files with 125 additions and 114 deletions.
3 changes: 1 addition & 2 deletions examples/arduino/z_pub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ void setup() {
Serial.print("Declaring publisher for ");
Serial.print(KEYEXPR);
Serial.println("...");
pub = z_declare_publisher(z_session_loan(&s), z_keyexpr(KEYEXPR), NULL);
if (!z_publisher_check(&pub)) {
if (z_declare_publisher(&pub, z_session_loan(&s), z_keyexpr(KEYEXPR), NULL) < 0) {
Serial.println("Unable to declare publisher for key expression!");
while (1) {
;
Expand Down
6 changes: 3 additions & 3 deletions examples/arduino/z_sub.ino
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ void setup() {
Serial.print(KEYEXPR);
Serial.println(" ...");
z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
z_owned_subscriber_t sub =
z_declare_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL);
if (!z_subscriber_check(&sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL) <
0) {
Serial.println("Unable to declare subscriber.");
while (1) {
;
Expand Down
4 changes: 2 additions & 2 deletions examples/espidf/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ void app_main() {
zp_start_lease_task(z_loan(s), NULL);

printf("Declaring publisher for '%s'...", KEYEXPR);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(KEYEXPR), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(KEYEXPR), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
exit(-1);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/espidf/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ void app_main() {

printf("Declaring Subscriber on '%s'...", KEYEXPR);
z_owned_closure_sample_t callback = z_closure(data_handler);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
exit(-1);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/freertos_plus_tcp/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ void app_main(void) {
}

printf("Declaring publisher for '%s'...\n", KEYEXPR);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(KEYEXPR), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(KEYEXPR), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/freertos_plus_tcp/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ void app_main(void) {
}

printf("Declaring publisher for '%s'...\n", KEYEXPR);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(KEYEXPR), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(KEYEXPR), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/freertos_plus_tcp/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ void app_main(void) {

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/freertos_plus_tcp/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ void app_main(void) {

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", KEYEXPR);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(KEYEXPR), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/mbed/z_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ int main(int argc, char **argv) {
zp_start_lease_task(z_session_loan(&s), NULL);

printf("Declaring publisher for '%s'...", KEYEXPR);
z_owned_publisher_t pub = z_declare_publisher(z_session_loan(&s), z_keyexpr(KEYEXPR), NULL);
if (!z_publisher_check(&pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_session_loan(&s), z_keyexpr(KEYEXPR), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
exit(-1);
}
Expand Down
6 changes: 3 additions & 3 deletions examples/mbed/z_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ int main(int argc, char **argv) {

printf("Declaring Subscriber on '%s'...", KEYEXPR);
z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
z_owned_subscriber_t sub =
z_declare_subscriber(z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL);
if (!z_subscriber_check(&sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_session_loan(&s), z_keyexpr(KEYEXPR), z_closure_sample_move(&callback), NULL) <
0) {
printf("Unable to declare subscriber.\n");
exit(-1);
}
Expand Down
8 changes: 4 additions & 4 deletions examples/unix/c11/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,15 @@ int main(int argc, char** argv) {
z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
z_keyexpr_t pong = z_keyexpr_unchecked("test/pong");

z_owned_publisher_t pub = z_declare_publisher(z_loan(session), ping, NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(session), ping, NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_owned_closure_sample_t respond = z_closure(callback, drop, NULL);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(session), pong, z_move(respond), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(session), pong, z_move(respond), NULL) < 0) {
printf("Unable to declare subscriber for key expression.\n");
return -1;
}
Expand Down
8 changes: 4 additions & 4 deletions examples/unix/c11/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ int main(int argc, char** argv) {
}

z_keyexpr_t pong = z_keyexpr_unchecked("test/pong");
z_owned_publisher_t pub = z_declare_publisher(z_loan(session), pong, NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(session), pong, NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
z_owned_closure_sample_t respond = z_closure(callback, drop, (void*)z_move(pub));
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(session), ping, z_move(respond), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(session), ping, z_move(respond), NULL) < 0) {
printf("Unable to declare subscriber for key expression.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ int main(int argc, char **argv) {
}

printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ int main(int argc, char **argv) {
}

printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_pub_thr.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ int main(int argc, char **argv) {
exit(-1);
}
// Declare publisher
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
exit(-1);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_pull.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ int main(int argc, char **argv) {

printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_ring_channel_t channel = z_sample_ring_channel_new(size);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ int main(int argc, char **argv) {

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_sub_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ int main(int argc, char **argv) {

printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_sample_fifo_channel_t channel = z_sample_fifo_channel_new(3);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(keyexpr), z_move(channel.send), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_sub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ int main(int argc, char **argv) {

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c11/z_sub_thr.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ int main(int argc, char **argv) {
// Declare Subscriber/resource
z_stats_t *context = z_stats_make();
z_owned_closure_sample_t callback = z_closure(on_sample, drop_stats, (void *)context);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL) < 0) {
printf("Unable to create subscriber.\n");
exit(-1);
}
Expand Down
9 changes: 4 additions & 5 deletions examples/unix/c99/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,16 @@ int main(int argc, char** argv) {
}

z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
z_owned_publisher_t pub = z_declare_publisher(z_session_loan(&session), ping, NULL);
if (!z_publisher_check(&pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_session_loan(&session), ping, NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_keyexpr_t pong = z_keyexpr_unchecked("test/pong");
z_owned_closure_sample_t respond = z_closure_sample(callback, drop, NULL);
z_owned_subscriber_t sub =
z_declare_subscriber(z_session_loan(&session), pong, z_closure_sample_move(&respond), NULL);
if (!z_subscriber_check(&sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_session_loan(&session), pong, z_closure_sample_move(&respond), NULL) < 0) {
printf("Unable to declare subscriber for key expression.\n");
return -1;
}
Expand Down
9 changes: 4 additions & 5 deletions examples/unix/c99/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ int main(int argc, char** argv) {
}

z_keyexpr_t pong = z_keyexpr_unchecked("test/pong");
z_owned_publisher_t pub = z_declare_publisher(z_session_loan(&session), pong, NULL);
if (!z_publisher_check(&pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_session_loan(&session), pong, NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
z_owned_closure_sample_t respond = z_closure_sample(callback, drop, (void*)z_publisher_move(&pub));
z_owned_subscriber_t sub =
z_declare_subscriber(z_session_loan(&session), ping, z_closure_sample_move(&respond), NULL);
if (!z_subscriber_check(&sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_session_loan(&session), ping, z_closure_sample_move(&respond), NULL) < 0) {
printf("Unable to declare subscriber for key expression.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c99/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ int main(int argc, char **argv) {
}

printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_session_loan(&s), z_keyexpr(keyexpr), NULL);
if (!z_publisher_check(&pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_session_loan(&s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/unix/c99/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ int main(int argc, char **argv) {
}

printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_session_loan(&s), z_keyexpr(keyexpr), NULL);
if (!z_publisher_check(&pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_session_loan(&s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
Expand Down
6 changes: 3 additions & 3 deletions examples/unix/c99/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ int main(int argc, char **argv) {

z_owned_closure_sample_t callback = z_closure_sample(data_handler, NULL, NULL);
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_subscriber_t sub =
z_declare_subscriber(z_session_loan(&s), z_keyexpr(keyexpr), z_closure_sample_move(&callback), NULL);
if (!z_subscriber_check(&sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_session_loan(&s), z_keyexpr(keyexpr), z_closure_sample_move(&callback), NULL) <
0) {
printf("Unable to declare subscriber.\n");
return -1;
}
Expand Down
8 changes: 4 additions & 4 deletions examples/windows/z_ping.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,16 @@ int main(int argc, char** argv) {
}

z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
z_owned_publisher_t pub = z_declare_publisher(z_loan(session), ping, NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(session), ping, NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_keyexpr_t pong = z_keyexpr_unchecked("test/pong");
z_owned_closure_sample_t respond = z_closure(callback, drop, NULL);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(session), pong, z_move(respond), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(session), pong, z_move(respond), NULL) < 0) {
printf("Unable to declare subscriber for key expression.\n");
return -1;
}
Expand Down
8 changes: 4 additions & 4 deletions examples/windows/z_pong.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ int main(int argc, char** argv) {
}

z_keyexpr_t pong = z_keyexpr_unchecked("test/pong");
z_owned_publisher_t pub = z_declare_publisher(z_loan(session), pong, NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(session), pong, NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}

z_keyexpr_t ping = z_keyexpr_unchecked("test/ping");
z_owned_closure_sample_t respond = z_closure(callback, drop, (void*)z_move(pub));
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(session), ping, z_move(respond), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(session), ping, z_move(respond), NULL) < 0) {
printf("Unable to declare subscriber for key expression.\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/windows/z_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ int main(int argc, char **argv) {
}

printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/windows/z_pub_st.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ int main(int argc, char **argv) {
}

printf("Declaring publisher for '%s'...\n", keyexpr);
z_owned_publisher_t pub = z_declare_publisher(z_loan(s), z_keyexpr(keyexpr), NULL);
if (!z_check(pub)) {
z_owned_publisher_t pub;
if (z_declare_publisher(&pub, z_loan(s), z_keyexpr(keyexpr), NULL) < 0) {
printf("Unable to declare publisher for key expression!\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions examples/windows/z_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ int main(int argc, char **argv) {

z_owned_closure_sample_t callback = z_closure(data_handler);
printf("Declaring Subscriber on '%s'...\n", keyexpr);
z_owned_subscriber_t sub = z_declare_subscriber(z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL);
if (!z_check(sub)) {
z_owned_subscriber_t sub;
if (z_declare_subscriber(&sub, z_loan(s), z_keyexpr(keyexpr), z_move(callback), NULL) < 0) {
printf("Unable to declare subscriber.\n");
return -1;
}
Expand Down
Loading

0 comments on commit 56b9815

Please sign in to comment.