diff --git a/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java b/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java index a7c3ea5..553c8c0 100644 --- a/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java +++ b/src/main/java/io/apicurio/registry/probe/kafka/ConsumedCustomersResource.java @@ -2,55 +2,36 @@ import io.apicurio.registry.probe.persistence.CustomerEntity; import io.apicurio.registry.probe.smoke.ProbeMonitoring; -import io.quarkus.runtime.StartupEvent; import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.annotations.Blocking; import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.event.Observes; +import jakarta.transaction.Transactional; import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CompletionStage; @ApplicationScoped public class ConsumedCustomersResource { private static final Logger log = LoggerFactory.getLogger(ProbeMonitoring.class); - @Channel("customers-from-kafka") - Multi customers; - - public void startMonitoring(@Observes StartupEvent startupEvent) { - int concurrentTasks = 2; - + @Incoming("customers-from-kafka") + @Transactional + @Blocking + public CompletionStage consume(Message customerMessage) { + final CustomerEntity customer = customerMessage.getPayload(); try { - concurrentTasks = Integer.parseInt(System.getenv("CONCURRENT_TASKS")); + log.info("Deleting customer with email: {}", customer.getEmail()); + customer.delete(); } catch (Exception e) { - log.warn("Cannot load concurrent tasks environment variable", e); + log.error("Exception detected in the Probe application: {}", e.getCause(), e); + return customerMessage.nack(e); } - - ExecutorService e = Executors.newFixedThreadPool(concurrentTasks); - - for (int i = 0; i < concurrentTasks; i++) { - e.submit(() -> { - log.info("Removing customers..."); - removeCustomers(); - }); - } - } - - public void removeCustomers() { - customers.map(customer -> { - final String stringToReturn = String.format("'%s' from %s", customer.getFirstName(), customer.getEmail()); - try { - log.info("Deleting customer with email: {}", customer.getEmail()); - customer.delete(); - } catch (Exception e) { - log.error("Exception detected in the Probe application: {}", e.getCause(), e); - } - return stringToReturn; - }); + return customerMessage.ack(); } } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 4ed5ccc..fc4eb92 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -6,3 +6,8 @@ %prod.quarkus.datasource.jdbc.max-size=8 %prod.quarkus.datasource.jdbc.min-size=2 + +mp.messaging.incoming.customers-from-kafka.topic=server1.inventory.customers +mp.messaging.incoming.customers-from-kafka.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer +mp.messaging.incoming.customers-from-kafka.connector=smallrye-kafka +