Skip to content

Commit

Permalink
doc clean-up, reader example, bump pulsar dependency (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haris Secic authored Oct 10, 2021
1 parent 7811755 commit c35d5e8
Show file tree
Hide file tree
Showing 16 changed files with 88 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.ExecutionException;

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // <1>
class ConsumerProducer { // <2>
@PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // <3>
@PulsarConsumer(topic = "persistent://public/default/messages-groovy-docs", consumerName = "shared-consumer-gtester") // <3>
void messagePrinter(String message) { // <4>
try {
String changed = report(message).get();
Expand All @@ -21,7 +21,7 @@ class ConsumerProducer { // <2>
}


@PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // <5>
@PulsarProducer(topic = "persistent://public/default/reports-groovy-docs", producerName = "report-producer-groovy") // <5>
CompletableFuture<String> report(String message) { // <6>
return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // <7>
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package example;

import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;

import java.util.concurrent.CompletableFuture;


@Singleton
class ReaderExample {

@PulsarReader(value = "persistent://public/default/messages", readerName = "simple-g-reader") // <1>
private Reader<String> reader // <2>

CompletableFuture<Message<String>> readNext() { // <3>
return reader.readNextAsync() // <4>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@PulsarSubscription(subscriptionName = "pulsar-jtest-subscription", subscriptionType = SubscriptionType.Shared) // <1>
public class ConsumerProducer { // <2>
@PulsarConsumer(topic = "persistent://public/default/messages-kotlin-docs", consumerName = "shared-consumer-ktester") // <3>
@PulsarConsumer(topic = "persistent://public/default/messages-java-docs", consumerName = "shared-consumer-jtester") // <3>
public void messagePrinter(String message) { // <4>
try {
String changed = report(message).get();
Expand All @@ -21,7 +21,7 @@ public void messagePrinter(String message) { // <4>
}


@PulsarProducer(topic = "persistent://public/default/reports-kotlin-docs", producerName = "report-producer-kotlin") // <5>
@PulsarProducer(topic = "persistent://public/default/reports-java-docs", producerName = "report-producer-java") // <5>
public CompletableFuture<String> report(String message) { // <6>
return CompletableFuture.supplyAsync(() -> String.format("Reporting message %s", message)); // <7>
}
Expand Down
20 changes: 20 additions & 0 deletions doc-examples/example-java/src/test/java/example/ReaderExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package example;

import io.micronaut.pulsar.annotation.PulsarReader;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Reader;

import java.util.concurrent.CompletableFuture;


@Singleton
public class ReaderExample {

@PulsarReader(value = "persistent://public/default/messages", readerName = "simple-j-reader") // <1>
private Reader<String> reader; // <2>

public CompletableFuture<Message<String>> readNext() { // <3>
return reader.readNextAsync(); // <4>
}
}
3 changes: 2 additions & 1 deletion doc-examples/example-kotlin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ dependencies {
compileOnly "io.micronaut:micronaut-inject-groovy"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core-jvm:1.5.1"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:1.5.2'
implementation project(":pulsar")
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.5.30"
implementation "org.jetbrains.kotlin:kotlin-reflect:1.5.21"
implementation "org.jetbrains.kotlin:kotlin-reflect:1.5.30"
testImplementation 'jakarta.inject:jakarta.inject-api:2.0.0'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package example

import io.micronaut.pulsar.annotation.PulsarReader
import jakarta.inject.Singleton
import kotlinx.coroutines.future.await
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.Reader

@Singleton
class ReaderExample {
@PulsarReader(value = "persistent://public/default/messages", readerName = "simple-k-reader") // <1>
private lateinit var reader: Reader<String> // <2>

suspend fun readNext(): Message<String> { // <3>
return reader.readNextAsync().await() // <4>
}
}
1 change: 1 addition & 0 deletions doc-examples/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
skipDocumentation=true
6 changes: 3 additions & 3 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
projectVersion=1.0.0
micronautDocsVersion=2.0.0.RC1
projectVersion=1.0.0-SNAPSHOT
micronautDocsVersion=2.0.0
micronautVersion=3.0.3
micronautTestVersion=3.0.0
groovyVersion=3.0.8
spockVersion=2.0-groovy-3.0
kotlinVersion=1.5.21
pulsarVersion=2.8.0
pulsarVersion=2.8.1
junitVersion=5.7.0
kotlinCoroutinesVersion=1.5.1

Expand Down
1 change: 1 addition & 0 deletions pulsar/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {
compileOnly "jakarta.inject:jakarta.inject-api:2.0.0"
compileOnly "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion"
compileOnly "com.google.code.findbugs:jsr305"
compileOnly 'com.google.protobuf:protobuf-java:3.18.1'

testImplementation "io.micronaut:micronaut-inject-groovy"
testImplementation("org.spockframework:spock-core:${spockVersion}") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public interface PulsarProducerRegistry {
Map<String, Producer<?>> getProducers();

/**
* Get single managed producer by it's name.
* @param id
* Get single managed producer by its name.
* @param id unique identifier
* @return Pulsar producer by given name
*/
Producer<?> getProducer(@NonNull String id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public interface PulsarReaderRegistry {
/**
* If not specified explicitly, reader name will default to property/field name.
*
* @param identifier
* @return Pulsar Reader by it's name
* @param identifier unique identifier for a reader
* @return Pulsar Reader if found for a given name
*/
Reader<?> getReader(String identifier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
RegexSubscriptionMode subscriptionTopicsMode() default AllTopics;

/**
* Used in combination with {@link this#topicsPattern()}. Ignored using {@link this#topics()}. Must be greater than
* Used in combination with {@link #topicsPattern()}. Ignored using {@link #topics()}. Must be greater than
* 1. Low values should be avoided. Pulsar default value is 1 minute
*
* @return Amount of delay between checks, in seconds, for new topic matching given pattern.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,21 +67,22 @@
MessageSchema schema() default BYTES;

/**
* @return Consumer name.
* @return Reader name.
*/
String readerName();
String readerName() default "";

/**
* By default reader should subscribe in non-blocking manner using default {@link java.util.concurrent.CompletableFuture} of {@link org.apache.pulsar.client.api.ConsumerBuilder#subscribeAsync()}.
* By default, reader should subscribe in non-blocking manner using default {@link java.util.concurrent.CompletableFuture}
* of {@link org.apache.pulsar.client.api.ConsumerBuilder#subscribeAsync()}.
* <p>
* If blocking set to false application will block until consumer is successfully subscribed
* If blocking is set to false, application thread initializing it will block until consumer is successfully subscribed.
*
* @return Should the consumer subscribe in async manner or blocking
*/
boolean subscribeAsync() default true;

/**
* @return Whether to position reader to newest available message in queue or not.
* @return Whether to position reader to the newest available message in queue or not.
*/
boolean startMessageLatest() default true;
}
1 change: 0 additions & 1 deletion src/main/docs/guide/consumer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ beans annotated with a `@PulsarSubscription` will try to fetch subscription name
`subscriptionName` property value in the `@PulsarConsumer` annotation. Subscription beans are singletons.

snippet::example.ConsumerProducer[project-base="doc-examples/example", indent="0"]

<1> The class holding consumers can be annotated with api:pulsar.annotation.PulsarSubscription. It's also allowed to set
subscription from the `@PulsarConsumer`. By that, consumer can be located in other beans like `@Singleton`.
<3> Methods that will process the message, in other words consumers, must be annotated with `@PulsarConsumer` and one
Expand Down
2 changes: 0 additions & 2 deletions src/main/docs/guide/producer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ in which case it's important to ensure that the first method parameter is what s
ignored.

snippet::example.Producer[project-base="doc-examples/example", indent="0"]

<1> Annotate interface with @PulsarProducerClient to notify Micronaut for processing it without implementation
<2> Methods are the actual producers so annotate them with @PulsarProducer
<3> Return decides whether to send message in a blocking or non-blocking manner.
Expand All @@ -33,7 +32,6 @@ before sending the message an if execution throws exception, sending will be ski
still used as a message body*.

snippet::example.ConsumerProducer[project-base="doc-examples/example", indent="0"]

<1> For Kotlin, open is required in non-interface (abstract) classes because of AOT
<5> Annotating method as a producer
<6> A non-abstract async method. In Kotlin case we need open to allow Micronaut AOT interception.
Expand Down
50 changes: 9 additions & 41 deletions src/main/docs/guide/reader.adoc
Original file line number Diff line number Diff line change
@@ -1,48 +1,16 @@
== Readers vs Consumers
== Readers

Readers can be used to gain more control over message flow. However, they are limited to a single topic in contrast to consumers. On the other hand, you request each message "manually" by explicitly calling next message read. A more useful feature in the case of Readers might be `seek`, where developers get the ability to position the reader where they want in the message log, thus being able to manually replay messages when required. Simply put, it's more similar to reading a file than reacting to an event, but the file gets modified by an external actor.
Pulsar supports both "Consumers" and "Readers". More can be read in their
https://pulsar.apache.org/docs/en/concepts-clients/#reader-interface[documentation]

== Creating readers

To initialize a reader, declare a field annotated with `@PulsarReader` inside any bean or as a constructor argument.

[source,java]
----
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import io.micronaut.pulsar.annotation.PulsarProducer;
snippet::example.ReaderExample[project-base="doc-examples/example", indent="0"]
<1> Reader annotation with the topic and the reader name
<2> Reader must be of type api:org.apache.pulsar.client.api.Reader
<3> Using readAsync requires `CompletableFeature` or in Kotlin awaiting is possible
<4> Calling the read will move the cursor to the next message or give null in case there are no more messages

@Singleton
public class MyReader {
@PulsarReader(...)
private Reader reader;
}
----

Only *topic* is a required parameter. However, it's important to note that if the producer name is not set, it will default to the method name which may cause collisions in case of non-unique method names.

=== Producer return values

If you need the `MessageId` from Pulsar, you can
specify *MessageId* as the return type instead of *void* as given in examples below. If reactive or async return types are used,
the method can only return `MessageId` as their return type like `Maybe<MessageId>`. If you have blocking methods,
`MessageId` or the type of the parameter passed to function can be used as a return type of the method.

Examples:
[source,java]
----
import io.micronaut.pulsar.annotation.PulsarProducerClient;
import io.micronaut.pulsar.annotation.PulsarProducer;
@PulsarProducerClient
public interface ProducerTester {
@PulsarProducer(topic = "public/default/test", producerName = "test-producer-1")
String returnParameter(String message);
@PulsarProducer(topic = "public/default/test", producerName = "test-producer-2")
MessageId returnMessageId(String message);
@PulsarProducer(topic = "public/default/test", producerName = "test-producer-3")
Single<MessageId> returnReactive(String message);
}
----
Reader `name` can be autogenerated but `topic` argument must be set.

0 comments on commit c35d5e8

Please sign in to comment.