diff --git a/examples/build.gradle.kts b/examples/build.gradle.kts index 9de1441..aea28cb 100644 --- a/examples/build.gradle.kts +++ b/examples/build.gradle.kts @@ -40,6 +40,7 @@ tasks { "ZPub", "ZPubThr", "ZPut", + "ZQuerier", "ZQueryable", "ZScout", "ZSub", diff --git a/examples/src/main/java/io/zenoh/ZQuerier.java b/examples/src/main/java/io/zenoh/ZQuerier.java new file mode 100644 index 0000000..2786ea1 --- /dev/null +++ b/examples/src/main/java/io/zenoh/ZQuerier.java @@ -0,0 +1,162 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh; + +import io.zenoh.bytes.ZBytes; +import io.zenoh.exceptions.ZError; +import io.zenoh.query.*; +import picocli.CommandLine; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.Callable; + +import static io.zenoh.ConfigKt.loadConfig; + +@CommandLine.Command( + name = "ZQuerier", + mixinStandardHelpOptions = true, + description = "Zenoh Querier example" +) +public class ZQuerier implements Callable { + + @Override + public Integer call() throws Exception { + Zenoh.initLogFromEnvOr("error"); + + Config config = loadConfig(emptyArgs, configFile, connect, listen, noMulticastScouting, mode); + Selector selector = Selector.tryFrom(this.selectorOpt); + + QueryTarget queryTarget = target != null ? QueryTarget.valueOf(target.toUpperCase()) : QueryTarget.BEST_MATCHING; + Duration queryTimeout = Duration.ofMillis(timeout); + + Session session = Zenoh.open(config); + QuerierOptions options = new QuerierOptions(); + options.setTarget(queryTarget); + options.setTimeout(queryTimeout); + Querier querier = session.declareQuerier(selector.getKeyExpr(), options); + + performQueries(querier, selector); + return 0; + } + + /** + * Performs queries in an infinite loop, printing responses. + */ + private void performQueries(Querier querier, Selector selector) throws ZError, InterruptedException { + for (int idx = 0; idx < Integer.MAX_VALUE; idx++) { + Thread.sleep(1000); + + String queryPayload = String.format("[%04d] %s", idx, payload != null ? payload : ""); + System.out.println("Querying '" + selector + "' with payload: '" + queryPayload + "'..."); + + Querier.GetOptions options = new Querier.GetOptions(); + options.setPayload(ZBytes.from(queryPayload)); + options.setParameters(selector.getParameters()); + + querier.get(this::handleReply, options); + } + } + + /** + * Handles replies received from the query. + */ + private void handleReply(Reply reply) { + if (reply instanceof Reply.Success) { + Reply.Success successReply = (Reply.Success) reply; + System.out.println(">> Received ('" + successReply.getSample().getKeyExpr() + + "': '" + successReply.getSample().getPayload() + "')"); + } else if (reply instanceof Reply.Error) { + Reply.Error errorReply = (Reply.Error) reply; + System.out.println(">> Received (ERROR: '" + errorReply.getError() + "')"); + } + } + + /** + * ----- Example arguments and private fields ----- + */ + + private final Boolean emptyArgs; + + ZQuerier(Boolean emptyArgs) { + this.emptyArgs = emptyArgs; + } + + @CommandLine.Option( + names = {"-s", "--selector"}, + description = "The selection of resources to query [default: demo/example/**].", + defaultValue = "demo/example/**" + ) + private String selectorOpt; + + @CommandLine.Option( + names = {"-p", "--payload"}, + description = "An optional payload to put in the query." + ) + private String payload; + + @CommandLine.Option( + names = {"-t", "--target"}, + description = "The target queryables of the query. Default: BEST_MATCHING. " + + "[possible values: BEST_MATCHING, ALL, ALL_COMPLETE]" + ) + private String target; + + @CommandLine.Option( + names = {"-o", "--timeout"}, + description = "The query timeout in milliseconds [default: 10000].", + defaultValue = "10000" + ) + private long timeout; + + @CommandLine.Option( + names = {"-c", "--config"}, + description = "A configuration file." + ) + private String configFile; + + @CommandLine.Option( + names = {"-m", "--mode"}, + description = "The session mode. Default: peer. Possible values: [peer, client, router].", + defaultValue = "peer" + ) + private String mode; + + @CommandLine.Option( + names = {"-e", "--connect"}, + description = "Endpoints to connect to.", + split = "," + ) + private List connect; + + @CommandLine.Option( + names = {"-l", "--listen"}, + description = "Endpoints to listen on.", + split = "," + ) + private List listen; + + @CommandLine.Option( + names = {"--no-multicast-scouting"}, + description = "Disable the multicast-based scouting mechanism.", + defaultValue = "false" + ) + private boolean noMulticastScouting; + + public static void main(String[] args) { + int exitCode = new CommandLine(new ZQuerier(args.length == 0)).execute(args); + System.exit(exitCode); + } +} diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt index c944e83..48b6399 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/Session.kt @@ -14,6 +14,7 @@ package io.zenoh +import io.zenoh.annotations.Unstable import io.zenoh.bytes.IntoZBytes import io.zenoh.config.ZenohId import io.zenoh.exceptions.ZError @@ -321,6 +322,42 @@ class Session private constructor(private val config: Config) : AutoCloseable { return resolveQueryableWithCallback(keyExpr, callback, options) } + + /** + * Declare a [Querier]. + * + * A querier allows to send queries to a queryable. + * + * Queriers are automatically undeclared when dropped. + * + * Example: + * ```java + * try (Session session = Zenoh.open(config)) { + * QuerierOptions options = new QuerierOptions(); + * options.setTarget(QueryTarget.BEST_MATCHING); + * Querier querier = session.declareQuerier(selector.getKeyExpr(), options); + * //... + * Querier.GetOptions options = new Querier.GetOptions(); + * options.setPayload(ZBytes.from("Example payload")); + * querier.get(reply -> {...}, options); + * } + * ``` + * + * @param keyExpr The [KeyExpr] for the querier. + * @param options Optional [QuerierOptions] to configure the querier. + * @return A [Querier] that will be undeclared on drop. + * @throws ZError + */ + @Unstable + @JvmOverloads + @Throws(ZError::class) + fun declareQuerier( + keyExpr: KeyExpr, + options: QuerierOptions = QuerierOptions() + ): Querier { + return resolveQuerier(keyExpr, options) + } + /** * Declare a [KeyExpr]. * @@ -560,6 +597,16 @@ class Session private constructor(private val config: Config) : AutoCloseable { } ?: throw (sessionClosedException) } + @OptIn(Unstable::class) + private fun resolveQuerier( + keyExpr: KeyExpr, + options: QuerierOptions + ): Querier { + return jniSession?.run { + declareQuerier(keyExpr, options) + } ?: throw sessionClosedException + } + @Throws(ZError::class) internal fun resolveGetWithHandler( selector: IntoSelector, diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt new file mode 100644 index 0000000..b296e0b --- /dev/null +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/annotations/Annotations.kt @@ -0,0 +1,23 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.annotations + +@RequiresOptIn( + level = RequiresOptIn.Level.WARNING, + message = "This feature is unstable and may change in future releases." +) +@Retention(AnnotationRetention.BINARY) +@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION) +annotation class Unstable diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt new file mode 100644 index 0000000..ee8f74b --- /dev/null +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNIQuerier.kt @@ -0,0 +1,135 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.jni + +import io.zenoh.annotations.Unstable +import io.zenoh.bytes.Encoding +import io.zenoh.bytes.IntoZBytes +import io.zenoh.bytes.into +import io.zenoh.config.ZenohId +import io.zenoh.exceptions.ZError +import io.zenoh.handlers.Callback +import io.zenoh.handlers.Handler +import io.zenoh.jni.callbacks.JNIGetCallback +import io.zenoh.jni.callbacks.JNIOnCloseCallback +import io.zenoh.keyexpr.KeyExpr +import io.zenoh.qos.CongestionControl +import io.zenoh.qos.Priority +import io.zenoh.qos.QoS +import io.zenoh.query.Parameters +import io.zenoh.query.Querier +import io.zenoh.query.Reply +import io.zenoh.sample.Sample +import io.zenoh.sample.SampleKind +import org.apache.commons.net.ntp.TimeStamp + +internal class JNIQuerier(val ptr: Long) { + + @OptIn(Unstable::class) + @Throws(ZError::class) + fun performGetWithCallback(keyExpr: KeyExpr, callback: Callback, options: Querier.GetOptions) { + performGet(keyExpr, options.parameters, callback, fun() {}, Unit, options.attachment, options.payload, options.encoding) + } + + @OptIn(Unstable::class) + @Throws(ZError::class) + fun performGetWithHandler(keyExpr: KeyExpr, handler: Handler, options: Querier.GetOptions): R { + return performGet(keyExpr, options.parameters, handler::handle, handler::onClose, handler.receiver(), options.attachment, options.payload, options.encoding) + } + + @Throws(ZError::class) + private fun performGet( + keyExpr: KeyExpr, + parameters: Parameters?, + callback: Callback, + onClose: () -> Unit, + receiver: R, + attachment: IntoZBytes?, + payload: IntoZBytes?, + encoding: Encoding? + ): R { + val getCallback = JNIGetCallback { + replierId: ByteArray?, + success: Boolean, + keyExpr2: String?, + payload2: ByteArray, + encodingId: Int, + encodingSchema: String?, + kind: Int, + timestampNTP64: Long, + timestampIsValid: Boolean, + attachmentBytes: ByteArray?, + express: Boolean, + priority: Int, + congestionControl: Int, + -> + val reply: Reply + if (success) { + val timestamp = if (timestampIsValid) TimeStamp(timestampNTP64) else null + val sample = Sample( + KeyExpr(keyExpr2!!, null), + payload2.into(), + Encoding(encodingId, schema = encodingSchema), + SampleKind.fromInt(kind), + timestamp, + QoS(CongestionControl.fromInt(congestionControl), Priority.fromInt(priority), express), + attachmentBytes?.into() + ) + reply = Reply.Success(replierId?.let { ZenohId(it) }, sample) + } else { + reply = Reply.Error( + replierId?.let { ZenohId(it) }, + payload2.into(), + Encoding(encodingId, schema = encodingSchema) + ) + } + callback.run(reply) + } + + getViaJNI(this.ptr, + keyExpr.jniKeyExpr?.ptr ?: 0, + keyExpr.keyExpr, + parameters?.toString(), + getCallback, + onClose, + attachment?.into()?.bytes, + payload?.into()?.bytes, + encoding?.id ?: Encoding.defaultEncoding().id, + encoding?.schema + ) + return receiver + } + + fun close() { + freePtrViaJNI(ptr) + } + + @Throws(ZError::class) + private external fun getViaJNI( + querierPtr: Long, + keyExprPtr: Long, + keyExprString: String, + parameters: String?, + callback: JNIGetCallback, + onClose: JNIOnCloseCallback, + attachmentBytes: ByteArray?, + payload: ByteArray?, + encodingId: Int, + encodingSchema: String?, + ) + + private external fun freePtrViaJNI(ptr: Long) + +} diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt index 8604bc2..6349356 100644 --- a/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/jni/JNISession.kt @@ -27,6 +27,7 @@ import io.zenoh.bytes.IntoZBytes import io.zenoh.config.ZenohId import io.zenoh.bytes.into import io.zenoh.Config +import io.zenoh.annotations.Unstable import io.zenoh.handlers.Handler import io.zenoh.pubsub.* import io.zenoh.qos.CongestionControl @@ -129,7 +130,7 @@ internal class JNISession { subCallback, fun() {} ) - return CallbackSubscriber(keyExpr, JNISubscriber(subscriberRawPtr)) + return CallbackSubscriber(keyExpr, JNISubscriber(subscriberRawPtr)) } @Throws(ZError::class) @@ -200,6 +201,33 @@ internal class JNISession { return HandlerQueryable(keyExpr, JNIQueryable(queryableRawPtr), handler.receiver()) } + @OptIn(Unstable::class) + fun declareQuerier( + keyExpr: KeyExpr, + options: QuerierOptions + ): Querier { + val querierRawPtr = declareQuerierViaJNI( + keyExpr.jniKeyExpr?.ptr ?: 0, + keyExpr.keyExpr, + sessionPtr.get(), + options.target.ordinal, + options.consolidationMode.ordinal, + options.congestionControl.ordinal, + options.priority.ordinal, + options.express, + options.timeout.toMillis() + ) + return Querier( + keyExpr, + QoS( + congestionControl = options.congestionControl, + priority = options.priority, + express = options.express + ), + JNIQuerier(querierRawPtr) + ) + } + @Throws(ZError::class) fun performGetWithCallback( intoSelector: IntoSelector, @@ -439,6 +467,19 @@ internal class JNISession { complete: Boolean ): Long + @Throws(ZError::class) + private external fun declareQuerierViaJNI( + keyExprPtr: Long, + keyExprString: String, + sessionPtr: Long, + target: Int, + consolidation: Int, + congestionControl: Int, + priority: Int, + express: Boolean, + timeoutMs: Long + ): Long + @Throws(ZError::class) private external fun declareKeyExprViaJNI(sessionPtr: Long, keyExpr: String): Long diff --git a/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt new file mode 100644 index 0000000..ef4c591 --- /dev/null +++ b/zenoh-java/src/commonMain/kotlin/io/zenoh/query/Querier.kt @@ -0,0 +1,161 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh.query + +import io.zenoh.annotations.Unstable +import io.zenoh.bytes.Encoding +import io.zenoh.bytes.IntoZBytes +import io.zenoh.exceptions.ZError +import io.zenoh.handlers.BlockingQueueHandler +import io.zenoh.handlers.Callback +import io.zenoh.handlers.Handler +import io.zenoh.jni.JNIQuerier +import io.zenoh.keyexpr.KeyExpr +import io.zenoh.qos.CongestionControl +import io.zenoh.qos.Priority +import io.zenoh.qos.QoS +import io.zenoh.session.SessionDeclaration +import java.time.Duration +import java.util.Optional +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingDeque + +/** + * A querier that allows to send queries to a [Queryable]. + * + * Queriers are automatically undeclared when dropped. + * + * Example: + * ```java + * try (Session session = Zenoh.open(config)) { + * QuerierOptions options = new QuerierOptions(); + * options.setTarget(QueryTarget.BEST_MATCHING); + * Querier querier = session.declareQuerier(selector.getKeyExpr(), options); + * //... + * Querier.GetOptions options = new Querier.GetOptions(); + * options.setPayload(ZBytes.from("Example payload")); + * querier.get(reply -> {...}, options); + * } + * ``` + * + * @param keyExpr The [KeyExpr] of the querier. + * @param qos The [QoS] configuration of the querier. + */ +@Unstable +class Querier internal constructor(val keyExpr: KeyExpr, val qos: QoS, private var jniQuerier: JNIQuerier?) : + SessionDeclaration, AutoCloseable { + + /** + * Get options for the [Querier]. + */ + data class GetOptions( + var parameters: Parameters? = null, + var payload: IntoZBytes? = null, + var encoding: Encoding? = null, + var attachment: IntoZBytes? = null + ) + + /** + * Perform a get operation to the [keyExpr] from the Querier and pipe them into a blocking queue. + */ + @Throws(ZError::class) + fun get( + options: GetOptions + ): BlockingQueue> { + val handler = BlockingQueueHandler(LinkedBlockingDeque()) + return resolveGetWithHandler(keyExpr, handler, options) + } + + /** + * Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies + * with the [callback] provided. + * + * @param callback [Callback] to handle the incoming replies. + * @param options [GetOptions] to configure the get operation. + */ + @Throws(ZError::class) + fun get( + callback: Callback, + options: GetOptions + ) { + resolveGetWithCallback(keyExpr, callback, options) + } + + /** + * Perform a get operation to the [keyExpr] from the Querier and handle the incoming replies + * with the [handler] provided. + * + * @param handler [Handler] to handle the receiving replies to the query. + * @param options [GetOptions] to configure the get operation. + */ + @Throws(ZError::class) + fun get( + handler: Handler, + options: GetOptions + ): R { + return resolveGetWithHandler(keyExpr, handler, options) + } + + /** + * Get the [QoS.congestionControl] of the querier. + */ + fun congestionControl() = qos.congestionControl + + /** + * Get the [QoS.priority] of the querier. + */ + fun priority() = qos.priority + + /** + * Undeclares the querier. After calling this function, the querier won't be valid anymore and get operations + * performed on it will fail. + */ + override fun undeclare() { + jniQuerier?.close() + jniQuerier = null + } + + /** + * Closes the querier. Equivalent to [undeclare], this function is automatically called when using + * try-with-resources. + */ + override fun close() { + undeclare() + } + + protected fun finalize() { + undeclare() + } + + private fun resolveGetWithCallback(keyExpr: KeyExpr, callback: Callback, options: GetOptions) { + jniQuerier?.performGetWithCallback(keyExpr, callback, options) ?: throw ZError("Querier is not valid.") + } + + private fun resolveGetWithHandler(keyExpr: KeyExpr, handler: Handler, options: GetOptions): R { + return jniQuerier?.performGetWithHandler(keyExpr, handler, options) ?: throw ZError("Querier is not valid.") + } +} + +/** + * Options for the [Querier] configuration. + */ +data class QuerierOptions( + var target: QueryTarget = QueryTarget.BEST_MATCHING, + var consolidationMode: ConsolidationMode = ConsolidationMode.AUTO, + var timeout: Duration = Duration.ofMillis(10000), + var express: Boolean = QoS.defaultQoS.express, + var congestionControl: CongestionControl = QoS.defaultQoS.congestionControl, + var priority: Priority = QoS.defaultQoS.priority +) diff --git a/zenoh-java/src/jvmTest/java/io/zenoh/QuerierTest.java b/zenoh-java/src/jvmTest/java/io/zenoh/QuerierTest.java new file mode 100644 index 0000000..00cb52b --- /dev/null +++ b/zenoh-java/src/jvmTest/java/io/zenoh/QuerierTest.java @@ -0,0 +1,99 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +package io.zenoh; + +import io.zenoh.bytes.Encoding; +import io.zenoh.bytes.ZBytes; +import io.zenoh.exceptions.ZError; +import io.zenoh.keyexpr.KeyExpr; +import io.zenoh.qos.QoS; +import io.zenoh.query.Querier; +import io.zenoh.query.Reply; +import io.zenoh.query.ReplyOptions; +import io.zenoh.sample.Sample; +import io.zenoh.sample.SampleKind; +import org.apache.commons.net.ntp.TimeStamp; +import org.junit.Test; + +import java.time.Instant; +import java.util.*; + +import static org.junit.Assert.*; + +public class QuerierTest { + + static final ZBytes testPayload = ZBytes.from("Hello queryable"); + static final KeyExpr testKeyExpr; + + static { + try { + testKeyExpr = KeyExpr.tryFrom("example/testing/keyexpr"); + } catch (ZError e) { + throw new RuntimeException(e); + } + } + + /** + * Test validating both Queryable and get operations. + */ + @Test + public void querier_runsWithCallbackTest() throws ZError { + var sample = new Sample( + testKeyExpr, + testPayload, + Encoding.defaultEncoding(), + SampleKind.PUT, + new TimeStamp(Date.from(Instant.now())), + new QoS(), + null + ); + var examplePayload = ZBytes.from("Example payload"); + var exampleAttachment = ZBytes.from("Example attachment"); + var session = Zenoh.open(Config.loadDefault()); + + var queryable = session.declareQueryable(testKeyExpr, query -> { + assertEquals(exampleAttachment, query.getAttachment()); + assertEquals(examplePayload, query.getPayload()); + + var replyOptions = new ReplyOptions(); + replyOptions.setTimeStamp(sample.getTimestamp()); + try { + query.reply(testKeyExpr, sample.getPayload(), replyOptions); + } catch (ZError e) { + throw new RuntimeException(e); + } + }); + + var querier = session.declareQuerier(testKeyExpr); + + Reply[] receivedReply = new Reply[1]; + var options = new Querier.GetOptions(); + options.setPayload(examplePayload); + options.setAttachment(exampleAttachment); + querier.get( + reply -> { + receivedReply[0] = reply; + }, + options + ); + + assertNotNull(receivedReply[0]); + assertEquals(sample, ((Reply.Success) receivedReply[0]).getSample()); + + queryable.close(); + querier.close(); + session.close(); + } +} diff --git a/zenoh-jni/src/lib.rs b/zenoh-jni/src/lib.rs index bf012f1..7720bb2 100644 --- a/zenoh-jni/src/lib.rs +++ b/zenoh-jni/src/lib.rs @@ -18,6 +18,7 @@ mod key_expr; mod liveliness; mod logger; mod publisher; +mod querier; mod query; mod queryable; mod scouting; diff --git a/zenoh-jni/src/querier.rs b/zenoh-jni/src/querier.rs new file mode 100644 index 0000000..f8af486 --- /dev/null +++ b/zenoh-jni/src/querier.rs @@ -0,0 +1,137 @@ +// +// Copyright (c) 2023 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +use std::sync::Arc; + +use jni::{ + objects::{JByteArray, JClass, JObject, JString}, + sys::jint, + JNIEnv, +}; +use zenoh::{key_expr::KeyExpr, query::Querier, Wait}; + +use crate::{ + errors::ZResult, + key_expr::process_kotlin_key_expr, + session::{on_reply_error, on_reply_success}, + throw_exception, + utils::{ + decode_byte_array, decode_encoding, decode_string, get_callback_global_ref, get_java_vm, + load_on_close, + }, + zerror, +}; + +/// Perform a Zenoh GET through a querier. +/// +/// This function is meant to be called from Java/Kotlin code through JNI. +/// +/// Parameters: +/// - `env`: The JNI environment. +/// - `_class`: The JNI class. +/// - `querier_ptr`: The raw pointer to the querier. +/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] provided to the kotlin querier. May be null in case of using an +/// undeclared key expression. +/// - `key_expr_str`: String representation of the key expression used during the querier declaration. +/// It won't be considered in case a key_expr_ptr to a declared key expression is provided. +/// - `selector_params`: Optional selector parameters for the query. +/// - `callback`: Reference to the Kotlin callback to be run upon receiving a reply. +/// - `on_close`: Reference to a kotlin callback to be run upon finishing the get operation, mostly used for closing a provided channel. +/// - `attachment`: Optional attachment. +/// - `payload`: Optional payload for the query. +/// - `encoding_id`: Encoding id of the payload provided. +/// - `encoding_schema`: Encoding schema of the payload provided. +/// +#[no_mangle] +#[allow(non_snake_case)] +pub unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_getViaJNI( + mut env: JNIEnv, + _class: JClass, + querier_ptr: *const Querier, + key_expr_ptr: /*nullable*/ *const KeyExpr<'static>, + key_expr_str: JString, + selector_params: /*nullable*/ JString, + callback: JObject, + on_close: JObject, + attachment: /*nullable*/ JByteArray, + payload: /*nullable*/ JByteArray, + encoding_id: jint, + encoding_schema: /*nullable*/ JString, +) { + let querier = Arc::from_raw(querier_ptr); + let _ = || -> ZResult<()> { + let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?; + let java_vm = Arc::new(get_java_vm(&mut env)?); + let callback_global_ref = get_callback_global_ref(&mut env, callback)?; + let on_close_global_ref = get_callback_global_ref(&mut env, on_close)?; + let on_close = load_on_close(&java_vm, on_close_global_ref); + let mut get_builder = querier.get().callback(move |reply| { + || -> ZResult<()> { + on_close.noop(); // Does nothing, but moves `on_close` inside the closure so it gets destroyed with the closure + tracing::debug!("Receiving reply through JNI: {:?}", reply); + let mut env = java_vm.attach_current_thread_as_daemon().map_err(|err| { + zerror!("Unable to attach thread for GET query callback: {}.", err) + })?; + + match reply.result() { + Ok(sample) => { + on_reply_success(&mut env, reply.replier_id(), sample, &callback_global_ref) + } + Err(error) => { + on_reply_error(&mut env, reply.replier_id(), error, &callback_global_ref) + } + } + }() + .unwrap_or_else(|err| tracing::error!("Error on get callback: {err}")); + }); + + if !selector_params.is_null() { + let params = decode_string(&mut env, &selector_params)?; + get_builder = get_builder.parameters(params) + }; + + if !payload.is_null() { + let encoding = decode_encoding(&mut env, encoding_id, &encoding_schema)?; + get_builder = get_builder.encoding(encoding); + get_builder = get_builder.payload(decode_byte_array(&env, payload)?); + } + + if !attachment.is_null() { + let attachment = decode_byte_array(&env, attachment)?; + get_builder = get_builder.attachment::>(attachment); + } + + get_builder + .wait() + .map(|_| tracing::trace!("Performing get on '{key_expr}'.",)) + .map_err(|err| zerror!(err)) + }() + .map_err(|err| throw_exception!(env, err)); + std::mem::forget(querier); +} + +/// +/// Frees the pointer of the querier. +/// +/// After a call to this function, no further jni operations should be performed using the querier associated to the raw pointer provided. +/// +#[no_mangle] +#[allow(non_snake_case)] +pub(crate) unsafe extern "C" fn Java_io_zenoh_jni_JNIQuerier_freePtrViaJNI( + _env: JNIEnv, + _: JClass, + querier_ptr: *const Querier<'static>, +) { + Arc::from_raw(querier_ptr); +} diff --git a/zenoh-jni/src/session.rs b/zenoh-jni/src/session.rs index 43017c0..8318102 100644 --- a/zenoh-jni/src/session.rs +++ b/zenoh-jni/src/session.rs @@ -23,7 +23,7 @@ use zenoh::{ config::Config, key_expr::KeyExpr, pubsub::{Publisher, Subscriber}, - query::{Query, Queryable, ReplyError, Selector}, + query::{Querier, Query, Queryable, ReplyError, Selector}, sample::Sample, session::{Session, ZenohId}, Wait, @@ -514,6 +514,69 @@ pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareSubscriberViaJNI( }) } +/// Declare a Zenoh querier via JNI. +/// +/// This function is meant to be called from Java/Kotlin code through JNI. +/// +/// Parameters: +/// - `env`: The JNI environment. +/// - `_class`: The JNI class. +/// - `key_expr_ptr`: A raw pointer to the [KeyExpr] to be used for the querier. May be null in case of using an +/// undeclared key expression. +/// - `key_expr_str`: String representation of the key expression to be used to declare the querier. +/// It won't be considered in case a key_expr_ptr to a declared key expression is provided. +/// - `target`: The ordinal value of the query target enum value. +/// - `consolidation`: The ordinal value of the consolidation enum value. +/// - `congestion_control`: The ordinal value of the congestion control enum value. +/// - `priority`: The ordinal value of the priority enum value. +/// - `is_express`: The boolean express value of the QoS provided. +/// - `timeout_ms`: The timeout in milliseconds. +#[no_mangle] +#[allow(non_snake_case)] +pub unsafe extern "C" fn Java_io_zenoh_jni_JNISession_declareQuerierViaJNI( + mut env: JNIEnv, + _class: JClass, + key_expr_ptr: /*nullable*/ *const KeyExpr<'static>, + key_expr_str: JString, + session_ptr: *const Session, + target: jint, + consolidation: jint, + congestion_control: jint, + priority: jint, + is_express: jboolean, + timeout_ms: jlong, +) -> *const Querier<'static> { + let session = Arc::from_raw(session_ptr); + || -> ZResult<*const Querier<'static>> { + let key_expr = process_kotlin_key_expr(&mut env, &key_expr_str, key_expr_ptr)?; + let query_target = decode_query_target(target)?; + let consolidation = decode_consolidation(consolidation)?; + let congestion_control = decode_congestion_control(congestion_control)?; + let timeout = Duration::from_millis(timeout_ms as u64); + let priority = decode_priority(priority)?; + tracing::debug!("Declaring querier on '{}'...", key_expr); + + let querier = session + .declare_querier(key_expr.to_owned()) + .congestion_control(congestion_control) + .consolidation(consolidation) + .express(is_express != 0) + .target(query_target) + .priority(priority) + .timeout(timeout) + .wait() + .map_err(|err| zerror!(err))?; + + tracing::debug!("Querier declared on '{}'.", key_expr); + std::mem::forget(session); + Ok(Arc::into_raw(Arc::new(querier))) + }() + .unwrap_or_else(|err| { + throw_exception!(env, err); + null() + }) +} + /// Declare a Zenoh queryable via JNI. /// /// This function is meant to be called from Java/Kotlin code through JNI.