From dd8b94363cefa5688762e13bf5d063e3cc25474a Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Fri, 13 Oct 2023 15:58:00 +0200 Subject: [PATCH] Add query & queryable token config (#260) --- examples/arduino/z_get.ino | 7 + examples/arduino/z_queryable.ino | 8 + examples/espidf/z_get.c | 4 + examples/espidf/z_queryable.c | 6 + examples/mbed/z_get.cpp | 7 + examples/mbed/z_queryable.cpp | 7 + examples/unix/c11/z_get.c | 7 + examples/unix/c11/z_queryable.c | 7 + examples/unix/c99/z_get.c | 7 + examples/unix/c99/z_queryable.c | 7 + examples/windows/z_get.c | 7 + examples/windows/z_queryable.c | 7 + examples/zephyr/z_get.c | 7 + include/zenoh-pico/api/primitives.h | 247 +++++++++++++------------ include/zenoh-pico/config.h | 14 ++ include/zenoh-pico/net/primitives.h | 74 ++++---- include/zenoh-pico/net/query.h | 2 + include/zenoh-pico/net/session.h | 4 + include/zenoh-pico/session/push.h | 26 +++ include/zenoh-pico/session/query.h | 2 + include/zenoh-pico/session/queryable.h | 2 + include/zenoh-pico/session/reply.h | 29 +++ src/api/api.c | 167 +++++++++-------- src/net/primitives.c | 80 ++++---- src/net/query.c | 2 + src/session/push.c | 35 ++++ src/session/query.c | 2 + src/session/queryable.c | 3 + src/session/reply.c | 44 +++++ src/session/rx.c | 22 +-- src/session/utils.c | 9 + tests/z_api_null_drop_test.c | 38 ++-- 32 files changed, 590 insertions(+), 300 deletions(-) create mode 100644 include/zenoh-pico/session/push.h create mode 100644 include/zenoh-pico/session/reply.h create mode 100644 src/session/push.c create mode 100644 src/session/reply.c diff --git a/examples/arduino/z_get.ino b/examples/arduino/z_get.ino index 1f92eb3c2..7ae75175f 100644 --- a/examples/arduino/z_get.ino +++ b/examples/arduino/z_get.ino @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 // WiFi-specific parameters #define SSID "SSID" #define PASS "PASS" @@ -118,3 +119,9 @@ void loop() { Serial.println("Unable to send query."); } } +#else +void setup() { + Serial.println("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it."); +} +void loop() {} +#endif diff --git a/examples/arduino/z_queryable.ino b/examples/arduino/z_queryable.ino index fe1f87b65..edde751ae 100644 --- a/examples/arduino/z_queryable.ino +++ b/examples/arduino/z_queryable.ino @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_QUERYABLE == 1 // WiFi-specific parameters #define SSID "SSID" #define PASS "PASS" @@ -106,3 +107,10 @@ void setup() { } void loop() { delay(5000); } + +#else +void setup() { + Serial.println("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it."); +} +void loop() {} +#endif diff --git a/examples/espidf/z_get.c b/examples/espidf/z_get.c index 83f321351..884dfc2a4 100644 --- a/examples/espidf/z_get.c +++ b/examples/espidf/z_get.c @@ -25,6 +25,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 #define ESP_WIFI_SSID "SSID" #define ESP_WIFI_PASS "PASS" #define ESP_MAXIMUM_RETRY 5 @@ -172,3 +173,6 @@ void app_main() { z_close(z_move(s)); printf("OK!\n"); } +#else +void app_main() { printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); } +#endif \ No newline at end of file diff --git a/examples/espidf/z_queryable.c b/examples/espidf/z_queryable.c index 9f5eb9abe..4448b3b2b 100644 --- a/examples/espidf/z_queryable.c +++ b/examples/espidf/z_queryable.c @@ -30,6 +30,7 @@ #define ESP_MAXIMUM_RETRY 5 #define WIFI_CONNECTED_BIT BIT0 +#if Z_FEATURE_QUERYABLE == 1 static bool s_is_wifi_connected = false; static EventGroupHandle_t s_event_group_handler; static int s_retry_count = 0; @@ -171,3 +172,8 @@ void app_main() { z_close(z_move(s)); printf("OK!\n"); } +#else +void app_main() { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); +} +#endif diff --git a/examples/mbed/z_get.cpp b/examples/mbed/z_get.cpp index a7acba4fd..39ea2c5fd 100644 --- a/examples/mbed/z_get.cpp +++ b/examples/mbed/z_get.cpp @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -94,3 +95,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/mbed/z_queryable.cpp b/examples/mbed/z_queryable.cpp index 5f621c74e..5a0dd4901 100644 --- a/examples/mbed/z_queryable.cpp +++ b/examples/mbed/z_queryable.cpp @@ -16,6 +16,7 @@ #include #include +#if Z_FEATURE_QUERYABLE == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -94,3 +95,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_get.c b/examples/unix/c11/z_get.c index 000993ab9..6c47a550b 100644 --- a/examples/unix/c11/z_get.c +++ b/examples/unix/c11/z_get.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 void reply_dropper(void *ctx) { (void)(ctx); printf(">> Received query final notification\n"); @@ -129,3 +130,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c11/z_queryable.c b/examples/unix/c11/z_queryable.c index 31a7d10ba..01a04410b 100644 --- a/examples/unix/c11/z_queryable.c +++ b/examples/unix/c11/z_queryable.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_QUERYABLE == 1 const char *keyexpr = "demo/example/zenoh-pico-queryable"; const char *value = "Queryable from Pico!"; @@ -124,3 +125,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_get.c b/examples/unix/c99/z_get.c index 65e22f91b..7b45b13f7 100644 --- a/examples/unix/c99/z_get.c +++ b/examples/unix/c99/z_get.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 void reply_dropper(void *ctx) { (void)(ctx); printf(">> Received query final notification\n"); @@ -129,3 +130,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/unix/c99/z_queryable.c b/examples/unix/c99/z_queryable.c index 8ddcf74e6..8c8539bfe 100644 --- a/examples/unix/c99/z_queryable.c +++ b/examples/unix/c99/z_queryable.c @@ -18,6 +18,7 @@ #include #include +#if Z_FEATURE_QUERYABLE == 1 const char *keyexpr = "demo/example/zenoh-pico-queryable"; const char *value = "Queryable from Pico!"; @@ -120,3 +121,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_get.c b/examples/windows/z_get.c index b2b6aafda..0d082a63e 100644 --- a/examples/windows/z_get.c +++ b/examples/windows/z_get.c @@ -17,6 +17,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 void reply_dropper(void *ctx) { (void)(ctx); printf(">> Received query final notification\n"); @@ -94,3 +95,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/windows/z_queryable.c b/examples/windows/z_queryable.c index 60b03ab5d..8fafa11d9 100644 --- a/examples/windows/z_queryable.c +++ b/examples/windows/z_queryable.c @@ -17,6 +17,7 @@ #include #include +#if Z_FEATURE_QUERYABLE == 1 const char *keyexpr = "demo/example/zenoh-pico-queryable"; const char *value = "Queryable from Pico!"; @@ -89,3 +90,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERYABLE but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/examples/zephyr/z_get.c b/examples/zephyr/z_get.c index 4fcc47649..279bd69a5 100644 --- a/examples/zephyr/z_get.c +++ b/examples/zephyr/z_get.c @@ -17,6 +17,7 @@ #include #include +#if Z_FEATURE_QUERY == 1 #define CLIENT_OR_PEER 0 // 0: Client mode; 1: Peer mode #if CLIENT_OR_PEER == 0 #define MODE "client" @@ -88,3 +89,9 @@ int main(int argc, char **argv) { return 0; } +#else +int main(void) { + printf("ERROR: Zenoh pico was compiled without Z_FEATURE_QUERY but this example requires it.\n"); + return -1; +} +#endif \ No newline at end of file diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 68861d6f9..14189c48d 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -829,6 +829,7 @@ int8_t z_put(z_session_t zs, z_keyexpr_t keyexpr, const uint8_t *payload, z_zint */ int8_t z_delete(z_session_t zs, z_keyexpr_t keyexpr, const z_delete_options_t *options); +#if Z_FEATURE_QUERY == 1 /** * Constructs the default values for the get operation. * @@ -852,6 +853,130 @@ z_get_options_t z_get_options_default(void); */ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owned_closure_reply_t *callback, const z_get_options_t *options); +/** + * Checks if the queryable answered with an OK, which allows this value to be treated as a sample. + * + * If this returns ``false``, you should use ``z_check`` before trying to use :c:func:`z_reply_err` if you want to + * process the error that may be here. + * + * Parameters: + * reply: Pointer to the received query reply. + * + * Returns: + * Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample, or + * ``false`` otherwise. + */ +_Bool z_reply_is_ok(const z_owned_reply_t *reply); + +/** + * Yields the contents of the reply by asserting it indicates a success. + * + * You should always make sure that :c:func:`z_reply_is_ok` returns ``true`` before calling this function. + * + * Parameters: + * reply: Pointer to the received query reply. + * + * Returns: + * Returns the :c:type:`z_sample_t` wrapped in the query reply. + */ +z_sample_t z_reply_ok(const z_owned_reply_t *reply); + +/** + * Yields the contents of the reply by asserting it indicates a failure. + * + * You should always make sure that :c:func:`z_reply_is_ok` returns ``false`` before calling this function. + * + * Parameters: + * reply: Pointer to the received query reply. + * + * Returns: + * Returns the :c:type:`z_value_t` wrapped in the query reply. + */ +z_value_t z_reply_err(const z_owned_reply_t *reply); +#endif + +#if Z_FEATURE_QUERYABLE == 1 +/** + * Constructs the default values for the queryable entity. + * + * Returns: + * Returns the constructed :c:type:`z_queryable_options_t`. + */ +z_queryable_options_t z_queryable_options_default(void); + +/** + * Declares a queryable for the given keyexpr. + * + * Received queries are processed by means of callbacks. + * + * Like most ``z_owned_X_t`` types, you may obtain an instance of :c:type:`z_owned_queryable_t` by loaning it using + * ``z_queryable_loan(&val)``. The ``z_loan(val)`` macro, available if your compiler supports C11's ``_Generic``, is + * equivalent to writing ``z_queryable_loan(&val)``. + * + * Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said + * instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code, + * consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but + * will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val`` + * is valid. + * + * To check if ``val`` is still valid, you may use ``z_queryable_check(&val)`` or ``z_check(val)`` if your compiler + * supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise. + * + * Parameters: + * zs: A loaned instance of the the :c:type:`z_session_t` where to declare the subscriber. + * keyexpr: A loaned instance of :c:type:`z_keyexpr_t` to associate with the subscriber. + * callback: A moved instance of :c:type:`z_owned_closure_query_t` containing the callbacks to be called and the + * context to pass to them. options: The options to apply to the queryable. If ``NULL`` is passed, the default options + * will be applied. + * + * Returns: + * A :c:type:`z_owned_queryable_t` with either a valid queryable or a failing queryable. + * Should the queryable be invalid, ``z_check(val)`` ing the returned value will return ``false``. + */ +z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_owned_closure_query_t *callback, + const z_queryable_options_t *options); + +/** + * Undeclares the queryable generated by a call to :c:func:`z_declare_queryable`. + * + * Parameters: + * queryable: A moved instance of :c:type:`z_owned_queryable_t` to undeclare. + * + * Returns: + * Returns ``0`` if the undeclare queryable operation is successful, or a ``negative value`` otherwise. + */ +int8_t z_undeclare_queryable(z_owned_queryable_t *queryable); + +/** + * Constructs the default values for the query reply operation. + * + * Returns: + * Returns the constructed :c:type:`z_query_reply_options_t`. + */ +z_query_reply_options_t z_query_reply_options_default(void); + +/** + * Sends a reply to a query. + * + * This function must be called inside of a :c:type:`z_owned_closure_query_t` callback associated to the + * :c:type:`z_owned_queryable_t`, passing the received query as parameters of the callback function. This function can + * be called multiple times to send multiple replies to a query. The reply will be considered complete when the callback + * returns. + * + * Parameters: + * query: Pointer to the received query. + * keyexpr: A loaned instance of :c:type:`z_keyexpr_t` to associate with the subscriber. + * payload: Pointer to the data to put. + * payload_len: The length of the ``payload``. + * options: The options to apply to the send query reply operation. If ``NULL`` is passed, the default options will be + * applied. + * + * Returns: + * Returns ``0`` if the send query reply operation is successful, or a ``negative value`` otherwise. + */ +int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const uint8_t *payload, size_t payload_len, + const z_query_reply_options_t *options); +#endif /** * Creates keyexpr owning string passed to it @@ -1113,128 +1238,6 @@ int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub); */ int8_t z_subscriber_pull(const z_pull_subscriber_t sub); -/** - * Constructs the default values for the queryable entity. - * - * Returns: - * Returns the constructed :c:type:`z_queryable_options_t`. - */ -z_queryable_options_t z_queryable_options_default(void); - -/** - * Declares a queryable for the given keyexpr. - * - * Received queries are processed by means of callbacks. - * - * Like most ``z_owned_X_t`` types, you may obtain an instance of :c:type:`z_owned_queryable_t` by loaning it using - * ``z_queryable_loan(&val)``. The ``z_loan(val)`` macro, available if your compiler supports C11's ``_Generic``, is - * equivalent to writing ``z_queryable_loan(&val)``. - * - * Like all ``z_owned_X_t``, an instance will be destroyed by any function which takes a mutable pointer to said - * instance, as this implies the instance's inners were moved. To make this fact more obvious when reading your code, - * consider using ``z_move(val)`` instead of ``&val`` as the argument. After a ``z_move``, ``val`` will still exist, but - * will no longer be valid. The destructors are double-drop-safe, but other functions will still trust that your ``val`` - * is valid. - * - * To check if ``val`` is still valid, you may use ``z_queryable_check(&val)`` or ``z_check(val)`` if your compiler - * supports ``_Generic``, which will return ``true`` if ``val`` is valid, or ``false`` otherwise. - * - * Parameters: - * zs: A loaned instance of the the :c:type:`z_session_t` where to declare the subscriber. - * keyexpr: A loaned instance of :c:type:`z_keyexpr_t` to associate with the subscriber. - * callback: A moved instance of :c:type:`z_owned_closure_query_t` containing the callbacks to be called and the - * context to pass to them. options: The options to apply to the queryable. If ``NULL`` is passed, the default options - * will be applied. - * - * Returns: - * A :c:type:`z_owned_queryable_t` with either a valid queryable or a failing queryable. - * Should the queryable be invalid, ``z_check(val)`` ing the returned value will return ``false``. - */ -z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_owned_closure_query_t *callback, - const z_queryable_options_t *options); - -/** - * Undeclares the queryable generated by a call to :c:func:`z_declare_queryable`. - * - * Parameters: - * queryable: A moved instance of :c:type:`z_owned_queryable_t` to undeclare. - * - * Returns: - * Returns ``0`` if the undeclare queryable operation is successful, or a ``negative value`` otherwise. - */ -int8_t z_undeclare_queryable(z_owned_queryable_t *queryable); - -/** - * Sends a reply to a query. - * - * This function must be called inside of a :c:type:`z_owned_closure_query_t` callback associated to the - * :c:type:`z_owned_queryable_t`, passing the received query as parameters of the callback function. This function can - * be called multiple times to send multiple replies to a query. The reply will be considered complete when the callback - * returns. - * - * Parameters: - * query: Pointer to the received query. - * keyexpr: A loaned instance of :c:type:`z_keyexpr_t` to associate with the subscriber. - * payload: Pointer to the data to put. - * payload_len: The length of the ``payload``. - * options: The options to apply to the send query reply operation. If ``NULL`` is passed, the default options will be - * applied. - * - * Returns: - * Returns ``0`` if the send query reply operation is successful, or a ``negative value`` otherwise. - */ -int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const uint8_t *payload, size_t payload_len, - const z_query_reply_options_t *options); - -/** - * Constructs the default values for the query reply operation. - * - * Returns: - * Returns the constructed :c:type:`z_query_reply_options_t`. - */ -z_query_reply_options_t z_query_reply_options_default(void); - -/** - * Checks if the queryable answered with an OK, which allows this value to be treated as a sample. - * - * If this returns ``false``, you should use ``z_check`` before trying to use :c:func:`z_reply_err` if you want to - * process the error that may be here. - * - * Parameters: - * reply: Pointer to the received query reply. - * - * Returns: - * Returns ``true`` if the queryable answered with an OK, which allows this value to be treated as a sample, or - * ``false`` otherwise. - */ -_Bool z_reply_is_ok(const z_owned_reply_t *reply); - -/** - * Yields the contents of the reply by asserting it indicates a success. - * - * You should always make sure that :c:func:`z_reply_is_ok` returns ``true`` before calling this function. - * - * Parameters: - * reply: Pointer to the received query reply. - * - * Returns: - * Returns the :c:type:`z_sample_t` wrapped in the query reply. - */ -z_sample_t z_reply_ok(const z_owned_reply_t *reply); - -/** - * Yields the contents of the reply by asserting it indicates a failure. - * - * You should always make sure that :c:func:`z_reply_is_ok` returns ``false`` before calling this function. - * - * Parameters: - * reply: Pointer to the received query reply. - * - * Returns: - * Returns the :c:type:`z_value_t` wrapped in the query reply. - */ -z_value_t z_reply_err(const z_owned_reply_t *reply); - /** * Checks if a given value is valid. * diff --git a/include/zenoh-pico/config.h b/include/zenoh-pico/config.h index fa2a3ff2a..f19a94b08 100644 --- a/include/zenoh-pico/config.h +++ b/include/zenoh-pico/config.h @@ -118,6 +118,20 @@ #define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0 #endif +/** + * Enable queryables + */ +#ifndef Z_FEATURE_QUERYABLE +#define Z_FEATURE_QUERYABLE 1 +#endif + +/** + * Enable queries + */ +#ifndef Z_FEATURE_QUERY +#define Z_FEATURE_QUERY 1 +#endif + /** * Enable TCP links. */ diff --git a/include/zenoh-pico/net/primitives.h b/include/zenoh-pico/net/primitives.h index 38a2affb4..bfb66ea66 100644 --- a/include/zenoh-pico/net/primitives.h +++ b/include/zenoh-pico/net/primitives.h @@ -128,6 +128,7 @@ _z_subscriber_t *_z_declare_subscriber(_z_session_t *zn, _z_keyexpr_t keyexpr, _ */ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub); +#if Z_FEATURE_QUERYABLE == 1 /** * Declare a :c:type:`_z_queryable_t` for the given resource key. * @@ -156,6 +157,44 @@ _z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bo */ int8_t _z_undeclare_queryable(_z_queryable_t *qle); +/** + * Send a reply to a query. + * + * This function must be called inside of a Queryable callback passing the + * query received as parameters of the callback function. This function can + * be called multiple times to send multiple replies to a query. The reply + * will be considered complete when the Queryable callback returns. + * + * Parameters: + * query: The query to reply to. The caller keeps its ownership. + * key: The resource key of this reply. The caller keeps the ownership. + * payload: The value of this reply, the caller keeps ownership. + */ +int8_t _z_send_reply(const z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload); +#endif + +#if Z_FEATURE_QUERY == 1 +/** + * Query data from the matching queryables in the system. + * + * Parameters: + * zn: The zenoh-net session. The caller keeps its ownership. + * keyexpr: The resource key to query. The callee gets the ownership of any + * allocated value. + * parameters: An indication to matching queryables about the queried data. + * target: The kind of queryables that should be target of this query. + * consolidation: The kind of consolidation that should be applied on replies. + * value: The payload of the query. + * callback: The callback function that will be called on reception of replies for this query. + * arg_call: A pointer that will be passed to the **callback** on each call. + * dropper: The callback function that will be called on upon completion of the callback. + * arg_drop: A pointer that will be passed to the **dropper** on each call. + */ +int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, + const z_consolidation_mode_t consolidation, const _z_value_t value, _z_reply_handler_t callback, + void *arg_call, _z_drop_handler_t dropper, void *arg_drop); +#endif + /*------------------ Operations ------------------*/ /** @@ -190,39 +229,4 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay */ int8_t _z_subscriber_pull(const _z_subscriber_t *sub); -/** - * Query data from the matching queryables in the system. - * - * Parameters: - * zn: The zenoh-net session. The caller keeps its ownership. - * keyexpr: The resource key to query. The callee gets the ownership of any - * allocated value. - * parameters: An indication to matching queryables about the queried data. - * target: The kind of queryables that should be target of this query. - * consolidation: The kind of consolidation that should be applied on replies. - * value: The payload of the query. - * callback: The callback function that will be called on reception of replies for this query. - * arg_call: A pointer that will be passed to the **callback** on each call. - * dropper: The callback function that will be called on upon completion of the callback. - * arg_drop: A pointer that will be passed to the **dropper** on each call. - */ -int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, - const z_consolidation_mode_t consolidation, const _z_value_t value, _z_reply_handler_t callback, - void *arg_call, _z_drop_handler_t dropper, void *arg_drop); - -/** - * Send a reply to a query. - * - * This function must be called inside of a Queryable callback passing the - * query received as parameters of the callback function. This function can - * be called multiple times to send multiple replies to a query. The reply - * will be considered complete when the Queryable callback returns. - * - * Parameters: - * query: The query to reply to. The caller keeps its ownership. - * key: The resource key of this reply. The caller keeps the ownership. - * payload: The value of this reply, the caller keeps ownership. - */ -int8_t _z_send_reply(const z_query_t *query, const _z_keyexpr_t keyexpr, const _z_value_t payload); - #endif /* ZENOH_PICO_PRIMITIVES_NETAPI_H */ diff --git a/include/zenoh-pico/net/query.h b/include/zenoh-pico/net/query.h index cb59f6dd5..0854907b7 100644 --- a/include/zenoh-pico/net/query.h +++ b/include/zenoh-pico/net/query.h @@ -39,7 +39,9 @@ typedef struct { void *_zn; // FIXME: _z_session_t *zn; } _z_queryable_t; +#if Z_FEATURE_QUERYABLE == 1 void _z_queryable_clear(_z_queryable_t *qbl); void _z_queryable_free(_z_queryable_t **qbl); +#endif #endif /* ZENOH_PICO_QUERY_NETAPI_H */ diff --git a/include/zenoh-pico/net/session.h b/include/zenoh-pico/net/session.h index a23fb84dc..8985eba02 100644 --- a/include/zenoh-pico/net/session.h +++ b/include/zenoh-pico/net/session.h @@ -54,8 +54,12 @@ typedef struct { _z_subscription_sptr_list_t *_remote_subscriptions; // Session queryables +#if Z_FEATURE_QUERYABLE == 1 _z_questionable_sptr_list_t *_local_questionable; +#endif +#if Z_FEATURE_QUERY == 1 _z_pending_query_list_t *_pending_queries; +#endif } _z_session_t; /** diff --git a/include/zenoh-pico/session/push.h b/include/zenoh-pico/session/push.h new file mode 100644 index 000000000..583746c15 --- /dev/null +++ b/include/zenoh-pico/session/push.h @@ -0,0 +1,26 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include + +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/message.h" + +#ifndef ZENOH_PICO_SESSION_PUSH_H +#define ZENOH_PICO_SESSION_PUSH_H + +int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push); + +#endif /* ZENOH_PICO_SESSION_PUSH_H */ \ No newline at end of file diff --git a/include/zenoh-pico/session/query.h b/include/zenoh-pico/session/query.h index c5597c3c8..86d03bb6f 100644 --- a/include/zenoh-pico/session/query.h +++ b/include/zenoh-pico/session/query.h @@ -18,6 +18,7 @@ #include "zenoh-pico/net/session.h" #include "zenoh-pico/protocol/core.h" +#if Z_FEATURE_QUERY == 1 /*------------------ Query ------------------*/ _z_zint_t _z_get_query_id(_z_session_t *zn); @@ -30,5 +31,6 @@ int8_t _z_trigger_query_reply_partial(_z_session_t *zn, _z_zint_t reply_context, int8_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id); void _z_unregister_pending_query(_z_session_t *zn, _z_pending_query_t *pq); void _z_flush_pending_queries(_z_session_t *zn); +#endif #endif /* ZENOH_PICO_SESSION_QUERY_H */ diff --git a/include/zenoh-pico/session/queryable.h b/include/zenoh-pico/session/queryable.h index d6218b262..95e8cb08a 100644 --- a/include/zenoh-pico/session/queryable.h +++ b/include/zenoh-pico/session/queryable.h @@ -19,6 +19,7 @@ #include "zenoh-pico/net/session.h" +#if Z_FEATURE_QUERYABLE == 1 #define _Z_QUERYABLE_COMPLETE_DEFAULT false #define _Z_QUERYABLE_DISTANCE_DEFAULT 0 @@ -30,5 +31,6 @@ _z_questionable_sptr_t *_z_register_questionable(_z_session_t *zn, _z_questionab int8_t _z_trigger_queryables(_z_session_t *zn, const _z_msg_query_t *query, const _z_keyexpr_t q_key, uint32_t qid); void _z_unregister_questionable(_z_session_t *zn, _z_questionable_sptr_t *q); void _z_flush_questionables(_z_session_t *zn); +#endif #endif /* ZENOH_PICO_SESSION_QUERYABLE_H */ diff --git a/include/zenoh-pico/session/reply.h b/include/zenoh-pico/session/reply.h new file mode 100644 index 000000000..e84fa5c7e --- /dev/null +++ b/include/zenoh-pico/session/reply.h @@ -0,0 +1,29 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include + +#include "zenoh-pico/net/session.h" +#include "zenoh-pico/protocol/core.h" +#include "zenoh-pico/protocol/definitions/message.h" +#include "zenoh-pico/protocol/definitions/network.h" + +#ifndef ZENOH_PICO_SESSION_REPLY_H +#define ZENOH_PICO_SESSION_REPLY_H + +int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply); + +int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final); + +#endif /* ZENOH_PICO_SESSION_REPLY_H */ \ No newline at end of file diff --git a/src/api/api.c b/src/api/api.c index b673e3659..7f1e0cdc1 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -421,13 +421,8 @@ OWNED_FUNCTIONS_PTR_COMMON(z_publisher_t, z_owned_publisher_t, publisher) OWNED_FUNCTIONS_PTR_CLONE(z_publisher_t, z_owned_publisher_t, publisher, _z_owner_noop_copy) void z_publisher_drop(z_owned_publisher_t *val) { z_undeclare_publisher(val); } -OWNED_FUNCTIONS_PTR_COMMON(z_queryable_t, z_owned_queryable_t, queryable) -OWNED_FUNCTIONS_PTR_CLONE(z_queryable_t, z_owned_queryable_t, queryable, _z_owner_noop_copy) -void z_queryable_drop(z_owned_queryable_t *val) { z_undeclare_queryable(val); } - OWNED_FUNCTIONS_PTR_INTERNAL(z_keyexpr_t, z_owned_keyexpr_t, keyexpr, _z_keyexpr_free, _z_keyexpr_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_hello_t, z_owned_hello_t, hello, _z_hello_free, _z_owner_noop_copy) -OWNED_FUNCTIONS_PTR_INTERNAL(z_reply_t, z_owned_reply_t, reply, _z_reply_free, _z_owner_noop_copy) OWNED_FUNCTIONS_PTR_INTERNAL(z_str_array_t, z_owned_str_array_t, str_array, _z_str_array_free, _z_owner_noop_copy) #define OWNED_FUNCTIONS_CLOSURE(ownedtype, name) \ @@ -642,6 +637,9 @@ int8_t z_delete(z_session_t zs, z_keyexpr_t keyexpr, const z_delete_options_t *o return ret; } +#if Z_FEATURE_QUERY == 1 +OWNED_FUNCTIONS_PTR_INTERNAL(z_reply_t, z_owned_reply_t, reply, _z_reply_free, _z_owner_noop_copy) + z_get_options_t z_get_options_default(void) { return (z_get_options_t){.target = z_query_target_default(), .consolidation = z_query_consolidation_default(), @@ -697,6 +695,88 @@ int8_t z_get(z_session_t zs, z_keyexpr_t keyexpr, const char *parameters, z_owne return ret; } +_Bool z_reply_is_ok(const z_owned_reply_t *reply) { + (void)(reply); + // For the moment always return TRUE. + // The support for reply errors will come in the next release. + return true; +} + +z_sample_t z_reply_ok(const z_owned_reply_t *reply) { return reply->_value->data.sample; } + +z_value_t z_reply_err(const z_owned_reply_t *reply) { + (void)(reply); + return (z_value_t){.payload = _z_bytes_empty(), .encoding = z_encoding_default()}; +} +#endif + +#if Z_FEATURE_QUERYABLE == 1 +OWNED_FUNCTIONS_PTR_COMMON(z_queryable_t, z_owned_queryable_t, queryable) +OWNED_FUNCTIONS_PTR_CLONE(z_queryable_t, z_owned_queryable_t, queryable, _z_owner_noop_copy) +void z_queryable_drop(z_owned_queryable_t *val) { z_undeclare_queryable(val); } + +z_queryable_options_t z_queryable_options_default(void) { + return (z_queryable_options_t){.complete = _Z_QUERYABLE_COMPLETE_DEFAULT}; +} + +z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_owned_closure_query_t *callback, + const z_queryable_options_t *options) { + void *ctx = callback->context; + callback->context = NULL; + + z_keyexpr_t key = keyexpr; + + // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition + // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic + // resource declarations are only performed on unicast transports. +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 + if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 + _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); + if (r == NULL) { + uint16_t id = _z_declare_resource(zs._val, keyexpr); + key = _z_rid_with_suffix(id, NULL); + } +#if Z_FEATURE_MULTICAST_TRANSPORT == 1 + } +#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 + + z_queryable_options_t opt = z_queryable_options_default(); + if (options != NULL) { + opt.complete = options->complete; + } + + return (z_owned_queryable_t){ + ._value = _z_declare_queryable(zs._val, key, opt.complete, callback->call, callback->drop, ctx)}; +} + +int8_t z_undeclare_queryable(z_owned_queryable_t *queryable) { + int8_t ret = _Z_RES_OK; + + ret = _z_undeclare_queryable(queryable->_value); + _z_queryable_free(&queryable->_value); + return ret; +} + +z_query_reply_options_t z_query_reply_options_default(void) { + return (z_query_reply_options_t){.encoding = z_encoding_default()}; +} + +int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const uint8_t *payload, size_t payload_len, + const z_query_reply_options_t *options) { + z_query_reply_options_t opts = options == NULL ? z_query_reply_options_default() : *options; + _z_value_t value = {.payload = + { + .start = payload, + ._is_alloc = false, + .len = payload_len, + }, + .encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}}; + return _z_send_reply(query, keyexpr, value); + return _Z_ERR_GENERIC; +} +#endif + z_owned_keyexpr_t z_keyexpr_new(const char *name) { z_owned_keyexpr_t key; @@ -890,81 +970,6 @@ int8_t z_undeclare_pull_subscriber(z_owned_pull_subscriber_t *sub) { int8_t z_subscriber_pull(const z_pull_subscriber_t sub) { return _z_subscriber_pull(sub._val); } -z_queryable_options_t z_queryable_options_default(void) { - return (z_queryable_options_t){.complete = _Z_QUERYABLE_COMPLETE_DEFAULT}; -} - -z_owned_queryable_t z_declare_queryable(z_session_t zs, z_keyexpr_t keyexpr, z_owned_closure_query_t *callback, - const z_queryable_options_t *options) { - void *ctx = callback->context; - callback->context = NULL; - - z_keyexpr_t key = keyexpr; - - // TODO: Currently, if resource declarations are done over multicast transports, the current protocol definition - // lacks a way to convey them to later-joining nodes. Thus, in the current version automatic - // resource declarations are only performed on unicast transports. -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - if (zs._val->_tp._type != _Z_TRANSPORT_MULTICAST_TYPE) { -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - _z_resource_t *r = _z_get_resource_by_key(zs._val, &keyexpr); - if (r == NULL) { - uint16_t id = _z_declare_resource(zs._val, keyexpr); - key = _z_rid_with_suffix(id, NULL); - } -#if Z_FEATURE_MULTICAST_TRANSPORT == 1 - } -#endif // Z_FEATURE_MULTICAST_TRANSPORT == 1 - - z_queryable_options_t opt = z_queryable_options_default(); - if (options != NULL) { - opt.complete = options->complete; - } - - return (z_owned_queryable_t){ - ._value = _z_declare_queryable(zs._val, key, opt.complete, callback->call, callback->drop, ctx)}; -} - -int8_t z_undeclare_queryable(z_owned_queryable_t *queryable) { - int8_t ret = _Z_RES_OK; - - ret = _z_undeclare_queryable(queryable->_value); - _z_queryable_free(&queryable->_value); - - return ret; -} - -z_query_reply_options_t z_query_reply_options_default(void) { - return (z_query_reply_options_t){.encoding = z_encoding_default()}; -} - -int8_t z_query_reply(const z_query_t *query, const z_keyexpr_t keyexpr, const uint8_t *payload, size_t payload_len, - const z_query_reply_options_t *options) { - z_query_reply_options_t opts = options == NULL ? z_query_reply_options_default() : *options; - _z_value_t value = {.payload = - { - .start = payload, - ._is_alloc = false, - .len = payload_len, - }, - .encoding = {.prefix = opts.encoding.prefix, .suffix = opts.encoding.suffix}}; - return _z_send_reply(query, keyexpr, value); -} - -_Bool z_reply_is_ok(const z_owned_reply_t *reply) { - (void)(reply); - // For the moment always return TRUE. - // The support for reply errors will come in the next release. - return true; -} - -z_value_t z_reply_err(const z_owned_reply_t *reply) { - (void)(reply); - return (z_value_t){.payload = _z_bytes_empty(), .encoding = z_encoding_default()}; -} - -z_sample_t z_reply_ok(const z_owned_reply_t *reply) { return reply->_value->data.sample; } - /**************** Tasks ****************/ zp_task_read_options_t zp_task_read_options_default(void) { return (zp_task_read_options_t){.__dummy = 0}; } @@ -1030,6 +1035,7 @@ int8_t zp_send_join(z_session_t zs, const zp_send_join_options_t *options) { (void)(options); return _zp_send_join(zs._val); } + z_owned_keyexpr_t z_publisher_keyexpr(z_publisher_t publisher) { z_owned_keyexpr_t ret = {._value = z_malloc(sizeof(_z_keyexpr_t))}; if (ret._value != NULL && publisher._val != NULL) { @@ -1037,6 +1043,7 @@ z_owned_keyexpr_t z_publisher_keyexpr(z_publisher_t publisher) { } return ret; } + z_owned_keyexpr_t z_subscriber_keyexpr(z_subscriber_t sub) { z_owned_keyexpr_t ret = z_keyexpr_null(); uint32_t lookup = sub._val->_entity_id; diff --git a/src/net/primitives.c b/src/net/primitives.c index d536f2e96..65538ae62 100644 --- a/src/net/primitives.c +++ b/src/net/primitives.c @@ -190,6 +190,7 @@ int8_t _z_undeclare_subscriber(_z_subscriber_t *sub) { return ret; } +#if Z_FEATURE_QUERYABLE == 1 /*------------------ Queryable Declaration ------------------*/ _z_queryable_t *_z_declare_queryable(_z_session_t *zn, _z_keyexpr_t keyexpr, _Bool complete, _z_questionable_handler_t callback, _z_drop_handler_t dropper, void *arg) { @@ -302,6 +303,47 @@ int8_t _z_send_reply(const z_query_t *query, _z_keyexpr_t keyexpr, const _z_valu return ret; } +#endif + +#if Z_FEATURE_QUERY == 1 +/*------------------ Query ------------------*/ +int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, + const z_consolidation_mode_t consolidation, _z_value_t value, _z_reply_handler_t callback, + void *arg_call, _z_drop_handler_t dropper, void *arg_drop) { + int8_t ret = _Z_RES_OK; + + // Create the pending query object + _z_pending_query_t *pq = (_z_pending_query_t *)z_malloc(sizeof(_z_pending_query_t)); + if (pq != NULL) { + pq->_id = _z_get_query_id(zn); + pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr); + pq->_parameters = _z_str_clone(parameters); + pq->_target = target; + pq->_consolidation = consolidation; + pq->_anykey = (strstr(pq->_parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; + pq->_callback = callback; + pq->_dropper = dropper; + pq->_pending_replies = NULL; + pq->_call_arg = arg_call; + pq->_drop_arg = arg_drop; + + ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session + if (ret == _Z_RES_OK) { + _z_bytes_t params = _z_bytes_wrap((uint8_t *)pq->_parameters, strlen(pq->_parameters)); + _z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, ¶ms, pq->_id, pq->_consolidation, &value); + + if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { + _z_unregister_pending_query(zn, pq); + ret = _Z_ERR_TRANSPORT_TX_FAILED; + } + } else { + _z_pending_query_clear(pq); + } + } + + return ret; +} +#endif /*------------------ Write ------------------*/ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *payload, const size_t len, @@ -353,44 +395,6 @@ int8_t _z_write(_z_session_t *zn, const _z_keyexpr_t keyexpr, const uint8_t *pay return ret; } -/*------------------ Query ------------------*/ -int8_t _z_query(_z_session_t *zn, _z_keyexpr_t keyexpr, const char *parameters, const z_query_target_t target, - const z_consolidation_mode_t consolidation, _z_value_t value, _z_reply_handler_t callback, - void *arg_call, _z_drop_handler_t dropper, void *arg_drop) { - int8_t ret = _Z_RES_OK; - - // Create the pending query object - _z_pending_query_t *pq = (_z_pending_query_t *)z_malloc(sizeof(_z_pending_query_t)); - if (pq != NULL) { - pq->_id = _z_get_query_id(zn); - pq->_key = _z_get_expanded_key_from_key(zn, &keyexpr); - pq->_parameters = _z_str_clone(parameters); - pq->_target = target; - pq->_consolidation = consolidation; - pq->_anykey = (strstr(pq->_parameters, Z_SELECTOR_QUERY_MATCH) == NULL) ? false : true; - pq->_callback = callback; - pq->_dropper = dropper; - pq->_pending_replies = NULL; - pq->_call_arg = arg_call; - pq->_drop_arg = arg_drop; - - ret = _z_register_pending_query(zn, pq); // Add the pending query to the current session - if (ret == _Z_RES_OK) { - _z_bytes_t params = _z_bytes_wrap((uint8_t *)pq->_parameters, strlen(pq->_parameters)); - _z_zenoh_message_t z_msg = _z_msg_make_query(&keyexpr, ¶ms, pq->_id, pq->_consolidation, &value); - - if (_z_send_n_msg(zn, &z_msg, Z_RELIABILITY_RELIABLE, Z_CONGESTION_CONTROL_BLOCK) != _Z_RES_OK) { - _z_unregister_pending_query(zn, pq); - ret = _Z_ERR_TRANSPORT_TX_FAILED; - } - } else { - _z_pending_query_clear(pq); - } - } - - return ret; -} - /*------------------ Pull ------------------*/ int8_t _z_subscriber_pull(const _z_subscriber_t *sub) { int8_t ret = _Z_RES_OK; diff --git a/src/net/query.c b/src/net/query.c index b9302c2fe..8b6d71b97 100644 --- a/src/net/query.c +++ b/src/net/query.c @@ -13,6 +13,7 @@ #include "zenoh-pico/session/query.h" +#if Z_FEATURE_QUERYABLE == 1 void _z_queryable_clear(_z_queryable_t *qbl) { // Nothing to clear (void)(qbl); @@ -28,3 +29,4 @@ void _z_queryable_free(_z_queryable_t **qbl) { *qbl = NULL; } } +#endif \ No newline at end of file diff --git a/src/session/push.c b/src/session/push.c new file mode 100644 index 000000000..5e0649983 --- /dev/null +++ b/src/session/push.c @@ -0,0 +1,35 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/session/push.h" + +#include "zenoh-pico/api/constants.h" +#include "zenoh-pico/api/primitives.h" +#include "zenoh-pico/collections/bytes.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/session/subscription.h" +#include "zenoh-pico/utils/logging.h" + +int8_t _z_trigger_push(_z_session_t *zn, _z_n_msg_push_t *push) { + int8_t ret = _Z_RES_OK; + + // TODO check body to know where to dispatch + _z_bytes_t payload = push->_body._is_put ? push->_body._body._put._payload : _z_bytes_empty(); + _z_encoding_t encoding = push->_body._is_put ? push->_body._body._put._encoding : z_encoding_default(); + int kind = push->_body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; + + ret = _z_trigger_subscriptions(zn, push->_key, payload, encoding, kind, push->_timestamp); + + return ret; +} \ No newline at end of file diff --git a/src/session/query.c b/src/session/query.c index ec976c7e3..d6a7a7910 100644 --- a/src/session/query.c +++ b/src/session/query.c @@ -22,6 +22,7 @@ #include "zenoh-pico/session/resource.h" #include "zenoh-pico/utils/logging.h" +#if Z_FEATURE_QUERY == 1 _z_reply_t *_z_reply_alloc_and_move(_z_reply_t *_reply) { _z_reply_t *reply = (_z_reply_t *)z_malloc(sizeof(_z_reply_t)); if (reply != NULL) { @@ -290,3 +291,4 @@ void _z_flush_pending_queries(_z_session_t *zn) { _z_mutex_unlock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 } +#endif \ No newline at end of file diff --git a/src/session/queryable.c b/src/session/queryable.c index 7431712b4..243519b69 100644 --- a/src/session/queryable.c +++ b/src/session/queryable.c @@ -24,6 +24,7 @@ #include "zenoh-pico/session/utils.h" #include "zenoh-pico/utils/logging.h" +#if Z_FEATURE_QUERYABLE == 1 _Bool _z_questionable_eq(const _z_questionable_t *one, const _z_questionable_t *two) { return one->_id == two->_id; } void _z_questionable_clear(_z_questionable_t *qle) { @@ -224,3 +225,5 @@ void _z_flush_questionables(_z_session_t *zn) { _z_mutex_unlock(&zn->_mutex_inner); #endif // Z_FEATURE_MULTI_THREAD == 1 } + +#endif diff --git a/src/session/reply.c b/src/session/reply.c new file mode 100644 index 000000000..0ca562936 --- /dev/null +++ b/src/session/reply.c @@ -0,0 +1,44 @@ +// +// Copyright (c) 2022 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +#include "zenoh-pico/session/reply.h" + +#include "zenoh-pico/api/constants.h" +#include "zenoh-pico/config.h" +#include "zenoh-pico/session/query.h" +#include "zenoh-pico/utils/logging.h" + +int8_t _z_trigger_reply_partial(_z_session_t *zn, _z_zint_t id, _z_keyexpr_t key, _z_msg_reply_t *reply) { + int8_t ret = _Z_RES_OK; + + // TODO check id to know where to dispatch + +#if Z_FEATURE_QUERY == 1 + ret = _z_trigger_query_reply_partial(zn, id, key, reply->_value.payload, reply->_value.encoding, Z_SAMPLE_KIND_PUT, + reply->_timestamp); +#endif + return ret; +} + +int8_t _z_trigger_reply_final(_z_session_t *zn, _z_n_msg_response_final_t *final) { + int8_t ret = _Z_RES_OK; + + // TODO check id to know where to dispatch + _z_zint_t id = final->_request_id; + +#if Z_FEATURE_QUERY == 1 + _z_trigger_query_reply_final(zn, id); +#endif + return ret; +} \ No newline at end of file diff --git a/src/session/rx.c b/src/session/rx.c index 082fe21ee..d9e17587b 100644 --- a/src/session/rx.c +++ b/src/session/rx.c @@ -24,8 +24,9 @@ #include "zenoh-pico/protocol/definitions/message.h" #include "zenoh-pico/protocol/definitions/network.h" #include "zenoh-pico/protocol/keyexpr.h" -#include "zenoh-pico/session/query.h" +#include "zenoh-pico/session/push.h" #include "zenoh-pico/session/queryable.h" +#include "zenoh-pico/session/reply.h" #include "zenoh-pico/session/resource.h" #include "zenoh-pico/session/session.h" #include "zenoh-pico/session/subscription.h" @@ -81,19 +82,20 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint } break; case _Z_N_PUSH: { _Z_DEBUG("Handling _Z_N_PUSH\n"); - _z_n_msg_push_t push = msg->_body._push; - _z_bytes_t payload = push._body._is_put ? push._body._body._put._payload : _z_bytes_empty(); - _z_encoding_t encoding = push._body._is_put ? push._body._body._put._encoding : z_encoding_default(); - int kind = push._body._is_put ? Z_SAMPLE_KIND_PUT : Z_SAMPLE_KIND_DELETE; - ret = _z_trigger_subscriptions(zn, push._key, payload, encoding, kind, push._timestamp); + _z_n_msg_push_t *push = &msg->_body._push; + ret = _z_trigger_push(zn, push); } break; case _Z_N_REQUEST: { _Z_DEBUG("Handling _Z_N_REQUEST\n"); _z_n_msg_request_t req = msg->_body._request; switch (req._tag) { case _Z_REQUEST_QUERY: { +#if Z_FEATURE_QUERYABLE == 1 _z_msg_query_t *query = &req._body._query; ret = _z_trigger_queryables(zn, query, req._key, req._rid); +#else + _Z_DEBUG("_Z_REQUEST_QUERY dropped, queryables not supported\n"); +#endif } break; case _Z_REQUEST_PUT: { _z_msg_put_t put = req._body._put; @@ -127,9 +129,8 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint _z_n_msg_response_t response = msg->_body._response; switch (response._tag) { case _Z_RESPONSE_BODY_REPLY: { - _z_msg_reply_t reply = response._body._reply; - ret = _z_trigger_query_reply_partial(zn, response._request_id, response._key, reply._value.payload, - reply._value.encoding, Z_SAMPLE_KIND_PUT, reply._timestamp); + _z_msg_reply_t *reply = &response._body._reply; + ret = _z_trigger_reply_partial(zn, response._request_id, response._key, reply); } break; case _Z_RESPONSE_BODY_ERR: { // @TODO: expose errors to the user @@ -155,8 +156,7 @@ int8_t _z_handle_network_message(_z_session_t *zn, _z_zenoh_message_t *msg, uint } break; case _Z_N_RESPONSE_FINAL: { _Z_DEBUG("Handling _Z_N_RESPONSE_FINAL\n"); - _z_zint_t id = msg->_body._response_final._request_id; - _z_trigger_query_reply_final(zn, id); + ret = _z_trigger_reply_final(zn, &msg->_body._response_final); } break; } _z_msg_clear(msg); diff --git a/src/session/utils.c b/src/session/utils.c index 494cb7a5d..8f80c260c 100644 --- a/src/session/utils.c +++ b/src/session/utils.c @@ -63,8 +63,12 @@ int8_t _z_session_init(_z_session_t *zn, _z_id_t *zid) { zn->_remote_resources = NULL; zn->_local_subscriptions = NULL; zn->_remote_subscriptions = NULL; +#if Z_FEATURE_QUERYABLE == 1 zn->_local_questionable = NULL; +#endif +#if Z_FEATURE_QUERY == 1 zn->_pending_queries = NULL; +#endif #if Z_FEATURE_MULTI_THREAD == 1 ret = _z_mutex_init(&zn->_mutex_inner); @@ -100,8 +104,13 @@ void _z_session_clear(_z_session_t *zn) { // Clean up the entities _z_flush_resources(zn); _z_flush_subscriptions(zn); + +#if Z_FEATURE_QUERYABLE == 1 _z_flush_questionables(zn); +#endif +#if Z_FEATURE_QUERY == 1 _z_flush_pending_queries(zn); +#endif #if Z_FEATURE_MULTI_THREAD == 1 _z_mutex_free(&zn->_mutex_inner); diff --git a/tests/z_api_null_drop_test.c b/tests/z_api_null_drop_test.c index f5707167c..6d54fea94 100644 --- a/tests/z_api_null_drop_test.c +++ b/tests/z_api_null_drop_test.c @@ -31,8 +31,6 @@ int main(void) { z_owned_scouting_config_t scouting_config_null_1 = z_scouting_config_null(); z_owned_pull_subscriber_t pull_subscriber_null_1 = z_pull_subscriber_null(); z_owned_subscriber_t subscriber_null_1 = z_subscriber_null(); - z_owned_queryable_t queryable_null_1 = z_queryable_null(); - z_owned_reply_t reply_null_1 = z_reply_null(); z_owned_hello_t hello_null_1 = z_hello_null(); z_owned_closure_sample_t closure_sample_null_1 = z_closure_sample_null(); z_owned_closure_query_t closure_query_null_1 = z_closure_query_null(); @@ -51,8 +49,6 @@ int main(void) { assert(!z_check(scouting_config_null_1)); assert(!z_check(pull_subscriber_null_1)); assert(!z_check(subscriber_null_1)); - assert(!z_check(queryable_null_1)); - assert(!z_check(reply_null_1)); assert(!z_check(hello_null_1)); assert(!z_check(str_null_1)); @@ -66,8 +62,6 @@ int main(void) { z_owned_scouting_config_t scouting_config_null_2; z_owned_pull_subscriber_t pull_subscriber_null_2; z_owned_subscriber_t subscriber_null_2; - z_owned_queryable_t queryable_null_2; - z_owned_reply_t reply_null_2; z_owned_hello_t hello_null_2; z_owned_closure_sample_t closure_sample_null_2; z_owned_closure_query_t closure_query_null_2; @@ -83,8 +77,6 @@ int main(void) { z_null(&scouting_config_null_2); z_null(&pull_subscriber_null_2); z_null(&subscriber_null_2); - z_null(&queryable_null_2); - z_null(&reply_null_2); z_null(&hello_null_2); z_null(&closure_sample_null_2); z_null(&closure_query_null_2); @@ -93,6 +85,21 @@ int main(void) { z_null(&closure_zid_null_2); z_null(&str_null_2); +#if Z_FEATURE_QUERYABLE == 1 + z_owned_queryable_t queryable_null_1 = z_queryable_null(); + assert(!z_check(queryable_null_1)); + z_owned_queryable_t queryable_null_2; + z_null(&queryable_null_2); + assert(!z_check(queryable_null_2)); +#endif +#if Z_FEATURE_QUERY == 1 + z_owned_reply_t reply_null_1 = z_reply_null(); + assert(!z_check(reply_null_1)); + z_owned_reply_t reply_null_2; + z_null(&reply_null_2); + assert(!z_check(reply_null_2)); +#endif + // // Test that null macro works the same as direct call // @@ -103,8 +110,6 @@ int main(void) { assert(!z_check(scouting_config_null_2)); assert(!z_check(pull_subscriber_null_2)); assert(!z_check(subscriber_null_2)); - assert(!z_check(queryable_null_2)); - assert(!z_check(reply_null_2)); assert(!z_check(hello_null_2)); assert(!z_check(str_null_2)); @@ -119,8 +124,6 @@ int main(void) { z_drop(z_move(scouting_config_null_1)); z_drop(z_move(pull_subscriber_null_1)); z_drop(z_move(subscriber_null_1)); - z_drop(z_move(queryable_null_1)); - z_drop(z_move(reply_null_1)); z_drop(z_move(hello_null_1)); z_drop(z_move(closure_sample_null_1)); z_drop(z_move(closure_query_null_1)); @@ -136,8 +139,6 @@ int main(void) { z_drop(z_move(scouting_config_null_2)); z_drop(z_move(pull_subscriber_null_2)); z_drop(z_move(subscriber_null_2)); - z_drop(z_move(queryable_null_2)); - z_drop(z_move(reply_null_2)); z_drop(z_move(hello_null_2)); z_drop(z_move(closure_sample_null_2)); z_drop(z_move(closure_query_null_2)); @@ -145,6 +146,15 @@ int main(void) { z_drop(z_move(closure_hello_null_2)); z_drop(z_move(closure_zid_null_2)); z_drop(z_move(str_null_2)); + +#if Z_FEATURE_QUERYABLE == 1 + z_drop(z_move(queryable_null_1)); + z_drop(z_move(queryable_null_2)); +#endif +#if Z_FEATURE_QUERY == 1 + z_drop(z_move(reply_null_1)); + z_drop(z_move(reply_null_2)); +#endif } return 0;