diff --git a/doc-examples/example-groovy/src/test/groovy/example/ConsumerProducer.groovy b/doc-examples/example-groovy/src/test/groovy/example/ConsumerProducer.groovy index 012f0585..7f1dc47c 100644 --- a/doc-examples/example-groovy/src/test/groovy/example/ConsumerProducer.groovy +++ b/doc-examples/example-groovy/src/test/groovy/example/ConsumerProducer.groovy @@ -10,7 +10,7 @@ import java.util.concurrent.ExecutionException; @PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // <1> class ConsumerProducer { // <2> - @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // <3> + @PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // <3> void messagePrinter(String message) { // <4> try { String changed = report(message).get(); @@ -21,7 +21,7 @@ class ConsumerProducer { // <2> } - @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // <5> + @PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // <5> CompletableFuture report(String message) { // <6> return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // <7> } diff --git a/doc-examples/example-groovy/src/test/groovy/example/ReaderExample.groovy b/doc-examples/example-groovy/src/test/groovy/example/ReaderExample.groovy new file mode 100644 index 00000000..07278483 --- /dev/null +++ b/doc-examples/example-groovy/src/test/groovy/example/ReaderExample.groovy @@ -0,0 +1,20 @@ +package example; + +import io.micronaut.pulsar.annotation.PulsarReader; +import jakarta.inject.Singleton; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Reader; + +import java.util.concurrent.CompletableFuture; + + +@Singleton +class ReaderExample { + + @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-g-reader") // <1> + private Reader reader // <2> + + CompletableFuture> readNext() { // <3> + return reader.readNextAsync() // <4> + } +} diff --git a/doc-examples/example-java/src/test/java/example/ConsumerProducer.java b/doc-examples/example-java/src/test/java/example/ConsumerProducer.java index 23a16fe7..ad76745d 100644 --- a/doc-examples/example-java/src/test/java/example/ConsumerProducer.java +++ b/doc-examples/example-java/src/test/java/example/ConsumerProducer.java @@ -10,7 +10,7 @@ @PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // <1> public class ConsumerProducer { // <2> - @PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // <3> + @PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // <3> public void messagePrinter(String message) { // <4> try { String changed = report(message).get(); @@ -21,7 +21,7 @@ public void messagePrinter(String message) { // <4> } - @PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // <5> + @PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // <5> public CompletableFuture report(String message) { // <6> return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // <7> } diff --git a/doc-examples/example-java/src/test/java/example/ReaderExample.java b/doc-examples/example-java/src/test/java/example/ReaderExample.java new file mode 100644 index 00000000..d77cda19 --- /dev/null +++ b/doc-examples/example-java/src/test/java/example/ReaderExample.java @@ -0,0 +1,20 @@ +package example; + +import io.micronaut.pulsar.annotation.PulsarReader; +import jakarta.inject.Singleton; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Reader; + +import java.util.concurrent.CompletableFuture; + + +@Singleton +public class ReaderExample { + + @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-j-reader") // <1> + private Reader reader; // <2> + + public CompletableFuture> readNext() { // <3> + return reader.readNextAsync(); // <4> + } +} diff --git a/doc-examples/example-kotlin/build.gradle b/doc-examples/example-kotlin/build.gradle index 0ecb4002..2fee7663 100644 --- a/doc-examples/example-kotlin/build.gradle +++ b/doc-examples/example-kotlin/build.gradle @@ -10,8 +10,9 @@ dependencies { compileOnly "io.micronaut:micronaut-inject-groovy" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.1" implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.5.1" + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.5.2' implementation project(":pulsar") implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.5.30" - implementation "org.jetbrains.kotlin:kotlin-reflect:1.5.21" + implementation "org.jetbrains.kotlin:kotlin-reflect:1.5.30" testImplementation 'jakarta.inject:jakarta.inject-api:2.0.0' } \ No newline at end of file diff --git a/doc-examples/example-kotlin/src/test/kotlin/example/ReaderExample.kt b/doc-examples/example-kotlin/src/test/kotlin/example/ReaderExample.kt new file mode 100644 index 00000000..4ea0703d --- /dev/null +++ b/doc-examples/example-kotlin/src/test/kotlin/example/ReaderExample.kt @@ -0,0 +1,17 @@ +package example + +import io.micronaut.pulsar.annotation.PulsarReader +import jakarta.inject.Singleton +import kotlinx.coroutines.future.await +import org.apache.pulsar.client.api.Message +import org.apache.pulsar.client.api.Reader + +@Singleton +class ReaderExample { + @PulsarReader(value = "persistent://public/default/messages", readerName = "simple-k-reader") // <1> + private lateinit var reader: Reader // <2> + + suspend fun readNext(): Message { // <3> + return reader.readNextAsync().await() // <4> + } +} \ No newline at end of file diff --git a/doc-examples/gradle.properties b/doc-examples/gradle.properties new file mode 100644 index 00000000..53493a23 --- /dev/null +++ b/doc-examples/gradle.properties @@ -0,0 +1 @@ +skipDocumentation=true \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 482ef5b0..795bc5dc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,11 +1,11 @@ -projectVersion=1.0.0 -micronautDocsVersion=2.0.0.RC1 +projectVersion=1.0.0-SNAPSHOT +micronautDocsVersion=2.0.0 micronautVersion=3.0.3 micronautTestVersion=3.0.0 groovyVersion=3.0.8 spockVersion=2.0-groovy-3.0 kotlinVersion=1.5.21 -pulsarVersion=2.8.0 +pulsarVersion=2.8.1 junitVersion=5.7.0 kotlinCoroutinesVersion=1.5.1 diff --git a/pulsar/build.gradle b/pulsar/build.gradle index 561a8014..7bbeef14 100644 --- a/pulsar/build.gradle +++ b/pulsar/build.gradle @@ -10,6 +10,7 @@ dependencies { compileOnly "jakarta.inject:jakarta.inject-api:2.0.0" compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion" compileOnly "com.google.code.findbugs:jsr305" + compileOnly 'com.google.protobuf:protobuf-java:3.18.1' testImplementation "io.micronaut:micronaut-inject-groovy" testImplementation("org.spockframework:spock-core:${spockVersion}") { diff --git a/pulsar/src/main/java/io/micronaut/pulsar/PulsarProducerRegistry.java b/pulsar/src/main/java/io/micronaut/pulsar/PulsarProducerRegistry.java index 22dec6e6..d6d26749 100644 --- a/pulsar/src/main/java/io/micronaut/pulsar/PulsarProducerRegistry.java +++ b/pulsar/src/main/java/io/micronaut/pulsar/PulsarProducerRegistry.java @@ -36,8 +36,8 @@ public interface PulsarProducerRegistry { Map> getProducers(); /** - * Get single managed producer by it's name. - * @param id + * Get single managed producer by its name. + * @param id unique identifier * @return Pulsar producer by given name */ Producer getProducer(@NonNull String id); diff --git a/pulsar/src/main/java/io/micronaut/pulsar/PulsarReaderRegistry.java b/pulsar/src/main/java/io/micronaut/pulsar/PulsarReaderRegistry.java index 5072a684..0ab391a1 100644 --- a/pulsar/src/main/java/io/micronaut/pulsar/PulsarReaderRegistry.java +++ b/pulsar/src/main/java/io/micronaut/pulsar/PulsarReaderRegistry.java @@ -30,8 +30,8 @@ public interface PulsarReaderRegistry { /** * If not specified explicitly, reader name will default to property/field name. * - * @param identifier - * @return Pulsar Reader by it's name + * @param identifier unique identifier for a reader + * @return Pulsar Reader if found for a given name */ Reader getReader(String identifier); diff --git a/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarConsumer.java b/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarConsumer.java index fe61d81a..af5c091a 100644 --- a/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarConsumer.java +++ b/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarConsumer.java @@ -118,7 +118,7 @@ RegexSubscriptionMode subscriptionTopicsMode() default AllTopics; /** - * Used in combination with {@link this#topicsPattern()}. Ignored using {@link this#topics()}. Must be greater than + * Used in combination with {@link #topicsPattern()}. Ignored using {@link #topics()}. Must be greater than * 1. Low values should be avoided. Pulsar default value is 1 minute * * @return Amount of delay between checks, in seconds, for new topic matching given pattern. diff --git a/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarReader.java b/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarReader.java index 7d930786..ebb86835 100644 --- a/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarReader.java +++ b/pulsar/src/main/java/io/micronaut/pulsar/annotation/PulsarReader.java @@ -67,21 +67,22 @@ MessageSchema schema() default BYTES; /** - * @return Consumer name. + * @return Reader name. */ - String readerName(); + String readerName() default ""; /** - * By default reader should subscribe in non-blocking manner using default {@link java.util.concurrent.CompletableFuture} of {@link org.apache.pulsar.client.api.ConsumerBuilder#subscribeAsync()}. + * By default, reader should subscribe in non-blocking manner using default {@link java.util.concurrent.CompletableFuture} + * of {@link org.apache.pulsar.client.api.ConsumerBuilder#subscribeAsync()}. *

- * If blocking set to false application will block until consumer is successfully subscribed + * If blocking is set to false, application thread initializing it will block until consumer is successfully subscribed. * * @return Should the consumer subscribe in async manner or blocking */ boolean subscribeAsync() default true; /** - * @return Whether to position reader to newest available message in queue or not. + * @return Whether to position reader to the newest available message in queue or not. */ boolean startMessageLatest() default true; } diff --git a/src/main/docs/guide/consumer.adoc b/src/main/docs/guide/consumer.adoc index 3c4eb416..0f731c35 100644 --- a/src/main/docs/guide/consumer.adoc +++ b/src/main/docs/guide/consumer.adoc @@ -4,7 +4,6 @@ beans annotated with a `@PulsarSubscription` will try to fetch subscription name `subscriptionName` property value in the `@PulsarConsumer` annotation. Subscription beans are singletons. snippet::example.ConsumerProducer[project-base="doc-examples/example", indent="0"] - <1> The class holding consumers can be annotated with api:pulsar.annotation.PulsarSubscription. It's also allowed to set subscription from the `@PulsarConsumer`. By that, consumer can be located in other beans like `@Singleton`. <3> Methods that will process the message, in other words consumers, must be annotated with `@PulsarConsumer` and one diff --git a/src/main/docs/guide/producer.adoc b/src/main/docs/guide/producer.adoc index 77bde732..6e6a3450 100644 --- a/src/main/docs/guide/producer.adoc +++ b/src/main/docs/guide/producer.adoc @@ -6,7 +6,6 @@ in which case it's important to ensure that the first method parameter is what s ignored. snippet::example.Producer[project-base="doc-examples/example", indent="0"] - <1> Annotate interface with @PulsarProducerClient to notify Micronaut for processing it without implementation <2> Methods are the actual producers so annotate them with @PulsarProducer <3> Return decides whether to send message in a blocking or non-blocking manner. @@ -33,7 +32,6 @@ before sending the message an if execution throws exception, sending will be ski still used as a message body*. snippet::example.ConsumerProducer[project-base="doc-examples/example", indent="0"] - <1> For Kotlin, open is required in non-interface (abstract) classes because of AOT <5> Annotating method as a producer <6> A non-abstract async method. In Kotlin case we need open to allow Micronaut AOT interception. diff --git a/src/main/docs/guide/reader.adoc b/src/main/docs/guide/reader.adoc index 07f48229..6edd92b4 100644 --- a/src/main/docs/guide/reader.adoc +++ b/src/main/docs/guide/reader.adoc @@ -1,48 +1,16 @@ -== Readers vs Consumers +== Readers -Readers can be used to gain more control over message flow. However, they are limited to a single topic in contrast to consumers. On the other hand, you request each message "manually" by explicitly calling next message read. A more useful feature in the case of Readers might be `seek`, where developers get the ability to position the reader where they want in the message log, thus being able to manually replay messages when required. Simply put, it's more similar to reading a file than reacting to an event, but the file gets modified by an external actor. +Pulsar supports both "Consumers" and "Readers". More can be read in their +https://pulsar.apache.org/docs/en/concepts-clients/#reader-interface[documentation] == Creating readers To initialize a reader, declare a field annotated with `@PulsarReader` inside any bean or as a constructor argument. -[source,java] ----- -import io.micronaut.pulsar.annotation.PulsarProducerClient; -import io.micronaut.pulsar.annotation.PulsarProducer; +snippet::example.ReaderExample[project-base="doc-examples/example", indent="0"] +<1> Reader annotation with the topic and the reader name +<2> Reader must be of type api:org.apache.pulsar.client.api.Reader +<3> Using readAsync requires `CompletableFeature` or in Kotlin awaiting is possible +<4> Calling the read will move the cursor to the next message or give null in case there are no more messages -@Singleton -public class MyReader { - @PulsarReader(...) - private Reader reader; -} ----- - -Only *topic* is a required parameter. However, it's important to note that if the producer name is not set, it will default to the method name which may cause collisions in case of non-unique method names. - -=== Producer return values - -If you need the `MessageId` from Pulsar, you can -specify *MessageId* as the return type instead of *void* as given in examples below. If reactive or async return types are used, -the method can only return `MessageId` as their return type like `Maybe`. If you have blocking methods, -`MessageId` or the type of the parameter passed to function can be used as a return type of the method. - -Examples: -[source,java] ----- -import io.micronaut.pulsar.annotation.PulsarProducerClient; -import io.micronaut.pulsar.annotation.PulsarProducer; - -@PulsarProducerClient -public interface ProducerTester { - - @PulsarProducer(topic = "public/default/test", producerName = "test-producer-1") - String returnParameter(String message); - - @PulsarProducer(topic = "public/default/test", producerName = "test-producer-2") - MessageId returnMessageId(String message); - - @PulsarProducer(topic = "public/default/test", producerName = "test-producer-3") - Single returnReactive(String message); -} ----- +Reader `name` can be autogenerated but `topic` argument must be set.