Skip to content

Commit

Permalink
Add kafka configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Jan 18, 2024
1 parent fcb34d8 commit 57252aa
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,36 @@

import io.apicurio.registry.probe.persistence.CustomerEntity;
import io.apicurio.registry.probe.smoke.ProbeMonitoring;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.annotations.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.transaction.Transactional;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class ConsumedCustomersResource {

private static final Logger log = LoggerFactory.getLogger(ProbeMonitoring.class);

@Channel("customers-from-kafka")
Multi<CustomerEntity> customers;

public void startMonitoring(@Observes StartupEvent startupEvent) {
int concurrentTasks = 2;

@Incoming("customers-from-kafka")
@Transactional
@Blocking
public CompletionStage<Void> consume(Message<CustomerEntity> customerMessage) {
final CustomerEntity customer = customerMessage.getPayload();
try {
concurrentTasks = Integer.parseInt(System.getenv("CONCURRENT_TASKS"));
log.info("Deleting customer with email: {}", customer.getEmail());
customer.delete();
} catch (Exception e) {
log.warn("Cannot load concurrent tasks environment variable", e);
log.error("Exception detected in the Probe application: {}", e.getCause(), e);
return customerMessage.nack(e);
}

ExecutorService e = Executors.newFixedThreadPool(concurrentTasks);

for (int i = 0; i < concurrentTasks; i++) {
e.submit(() -> {
log.info("Removing customers...");
removeCustomers();
});
}
}

public void removeCustomers() {
customers.map(customer -> {
final String stringToReturn = String.format("'%s' from %s", customer.getFirstName(), customer.getEmail());
try {
log.info("Deleting customer with email: {}", customer.getEmail());
customer.delete();
} catch (Exception e) {
log.error("Exception detected in the Probe application: {}", e.getCause(), e);
}
return stringToReturn;
});
return customerMessage.ack();
}
}

5 changes: 5 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@
%prod.quarkus.datasource.jdbc.max-size=8
%prod.quarkus.datasource.jdbc.min-size=2


mp.messaging.incoming.customers-from-kafka.topic=server1.inventory.customers
mp.messaging.incoming.customers-from-kafka.value.deserializer=io.apicurio.registry.serde.avro.AvroKafkaDeserializer
mp.messaging.incoming.customers-from-kafka.connector=smallrye-kafka

0 comments on commit 57252aa

Please sign in to comment.