Include the BOMs and then bootique-kafka-client
:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.bootique.bom</groupId>
<artifactId>bootique-bom</artifactId>
<version>3.0-M6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
...
<!-- If using Producer and/or Consumer -->
<dependency>
<groupId>io.bootique.kafka</groupId>
<artifactId>bootique-kafka-client</artifactId>
</dependency>
<!-- If using streams -->
<dependency>
<groupId>io.bootique.kafka</groupId>
<artifactId>bootique-kafka-streams</artifactId>
</dependency>
Configure parameters in the YAML. Note that practically all of these settings can be overridden when creating a
specific Producer or Consumer instance via KafkaProducerFactory
or KafkaConsumerFactory
. So this is just a
collection of defaults for the most typical Producer or Consumer:
kafkaclient:
# any number of named clusters, specifying comma-separated bootstrap Kafka servers for each.
clusters:
cluster1: 127.0.0.1:9092
cluster2: host1:9092,host2:9092
consumer:
autoCommit: true
autoCommitInterval: "200ms"
defaultGroup: myappgroup
sessionTimeout: "2s"
producer:
acks: all # values are "all" or numeric number for min acks
retries: 1
batchSize: 16384
linger: "1ms"
bufferMemory: 33554432
Now you can inject producer and consumer factories and create any number of producers and consumers (for more details see bootique-kafka-examples).
Producer:
@Inject
KafkaProducerFactory factory;
public void runProducer() {
Producer<byte[], String> producer = factory
.charValueProducer()
.cluster("cluster2")
.create();
producer.send(new ProducerRecord<>("mytopic", "Hi!"));
// close if there's nothing else to send
producer.close();
}
Consumer example (also see this code sample) :
@Inject
KafkaConsumerFactory factory;
// a custom function to consume data
public void consumeBatch(Consumer<K, V> consumer, ConsumerRecords<K, V> data){
data.forEach(r -> System.out.println(r.topic() + "_" + r.offset() + ": " + r.value()))
}
public void runConsumer() {
KafkaPollingTracker poll = factory
// configure consumer
.charValueConsumer()
.cluster("cluster1")
.group("somegroup")
.topic("mytopic")
// start the consumer in the background
.consume(this::consumeBatch, Duration.ofSeconds(1));
// Close when we need to stop consumption. With no explicit Bootique will
// close the consumer before the app exit
// poll.close();
}
TODO