Skip to content

Commit

Permalink
add delivery semantic examples
Browse files Browse the repository at this point in the history
  • Loading branch information
sauljabin committed Jul 28, 2024
1 parent 2c14eb4 commit 50f79e1
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 42 deletions.
12 changes: 5 additions & 7 deletions kafka-avro-clients/src/main/java/kafka/sandbox/cli/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ public Integer call() throws Exception {
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
new Thread(() -> {
consumer.wakeup();
latch.countDown();
}, "consumer-shutdown-hook")
);

// infinite loop
Expand All @@ -74,6 +71,7 @@ public void run() {
log.info("Shutdown gracefully");
} finally {
consumer.close();
latch.countDown();
}
},
"consumer-thread"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ public Integer call() throws Exception {
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
new Thread(() -> {
consumer.wakeup();
latch.countDown();
}, "consumer-shutdown-hook")
);

// infinite loop
Expand Down Expand Up @@ -75,6 +72,7 @@ public void run() {
log.info("Shutdown gracefully");
} finally {
consumer.close();
latch.countDown();
}
},
"consumer-thread"
Expand Down
36 changes: 36 additions & 0 deletions kafka-delivery-guarantees-clients/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
plugins {
id 'java'
id 'application'
}

repositories {
mavenCentral()
maven {
url = 'https://packages.confluent.io/maven/'
}
}

dependencies {
testImplementation "org.junit.jupiter:junit-jupiter:${junitVersion}"

implementation "org.apache.kafka:kafka-clients:${kafkaVersion}"
implementation "io.confluent:kafka-json-serializer:${confluentVersion}"

implementation 'info.picocli:picocli:4.6.1'
implementation 'net.datafaker:datafaker:2.0.2'
implementation 'org.slf4j:slf4j-simple:1.7.30'

compileOnly "org.projectlombok:lombok:${lombokVersion}"
annotationProcessor "org.projectlombok:lombok:${lombokVersion}"

testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
}

application {
mainClass = 'kafka.sandbox.App'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package kafka.sandbox;

import kafka.sandbox.cli.Consumer;
import kafka.sandbox.cli.KafkaClients;
import picocli.CommandLine;

import java.io.IOException;
import java.util.Properties;

public class App {

public static void main(String[] args) throws IOException {
Properties consumerProps = getProperties();

CommandLine commandLine = new CommandLine(new KafkaClients())
.addSubcommand(new Consumer(consumerProps));

System.exit(commandLine.execute(args));
}

private static Properties getProperties() throws IOException {
Properties props = new Properties();
props.load(App.class.getClassLoader().getResourceAsStream("consumer.properties"));
return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package kafka.sandbox.cli;

public class BusinessLogicException extends RuntimeException {
public BusinessLogicException(String s) {
super(s);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package kafka.sandbox.cli;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;

@Slf4j
@Command(name = "consume", description = "Consumes messages from topic")
public class Consumer implements Callable<Integer> {

public static final String AT_MOST_ONCE = "at-most-once";
private static final String AT_LEAST_ONCE = "at-least-once";
private static final int MAX_RETRIES = 5;

private KafkaConsumer<String, User> consumer;
private CountDownLatch latch;
private final Properties props;
private final Random random = new Random();

@Parameters(
index = "0",
description = "Topic name"
)
private String topic;

@Option(names = "-s", description = "Semantic. Allowed values: [" + AT_MOST_ONCE + ", " + AT_LEAST_ONCE + "] (default: " + AT_MOST_ONCE + ")", defaultValue = AT_MOST_ONCE)
private String semantic;

public Consumer(Properties props) {
this.props = props;
}

@Override
public Integer call() throws Exception {
consumer = new KafkaConsumer<>(props);
latch = new CountDownLatch(1);
consumer.subscribe(Collections.singleton(topic));

// attach shutdown handler to catch control-c and creating a latch
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown, "consumer-shutdown-hook"));

// infinite loop
Thread infiniteLoop = new Thread(this::infiniteLoop, "consumer-thread");
infiniteLoop.start();
latch.await();

return CommandLine.ExitCode.OK;
}

private void shutdown() {
consumer.wakeup();
latch.countDown();
}

private void infiniteLoop() {
try {
if (semantic.equals(AT_MOST_ONCE)) {
atMostOnce();
} else {
atLeastOnce();
}
} catch (WakeupException we) {
log.info("Shutdown gracefully");
} finally {
consumer.close();
latch.countDown();
}
}

private void atLeastOnce() {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(500));

for (ConsumerRecord<String, User> record : records) {
try {
businessLogic(record);

// offsets after the message is processed
consumer.commitSync(Map.of(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
} catch (Exception e) {
log.error("There was an error processing a message: ", e);

// implement recovery and restart (kubernetes), dead-letter queue, etc

// throw the exception up
throw e;
}
}

}
}

private void atMostOnce() {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofMillis(500));

// offsets are committed as soon as the message is received
consumer.commitSync();

for (ConsumerRecord<String, User> record : records) {
try {
businessLogic(record);
} catch (Exception e) {
// the exception is ignored and the message is lost
log.warn("There was an error but it was ignored because this is: " + AT_MOST_ONCE);
}
}
}
}

private void businessLogic(ConsumerRecord<String, User> record) {
log.info("Message was read: partition = {}, offset = {}", record.partition(), record.offset());

if (random.nextBoolean()) {
throw new BusinessLogicException("There was an error processing a message");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package kafka.sandbox.cli;

import picocli.CommandLine.Command;
import picocli.CommandLine.Model.CommandSpec;

import static picocli.CommandLine.ParameterException;
import static picocli.CommandLine.Spec;

@Command(
name = "kafka-delivery-guarantees-clients",
description = "Allows you either to produce or consume topic",
synopsisSubcommandLabel = "COMMAND"
)
public class KafkaClients implements Runnable {

@Spec
private CommandSpec spec;

@Override
public void run() {
throw new ParameterException(spec.commandLine(), "Missing required subcommand");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package kafka.sandbox.cli;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Builder;
import lombok.Data;

@Builder
@Data
public class User {
@JsonProperty
public String id;

@JsonProperty
public String firstName;

@JsonProperty
public String lastName;

@JsonProperty
public String address;

@JsonProperty
public int age;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
bootstrap.servers=kafka1:9092
group.id=client.consumer
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
specific.avro.reader=true
client.id=client.consumer
value.deserializer=io.confluent.kafka.serializers.KafkaJsonDeserializer
12 changes: 5 additions & 7 deletions kafka-json-clients/src/main/java/kafka/sandbox/cli/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,10 @@ public Integer call() throws Exception {
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
new Thread(() -> {
consumer.wakeup();
latch.countDown();
}, "consumer-shutdown-hook")
);

// infinite loop
Expand Down Expand Up @@ -90,6 +87,7 @@ public void run() {
log.info("Shutdown gracefully");
} finally {
consumer.close();
latch.countDown();
}
},
"consumer-thread"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,10 @@ public void consume(String topic) {
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
new Thread(() -> {
consumer.wakeup();
latch.countDown();
}, "consumer-shutdown-hook")
);

Thread infiniteLoop = new Thread(
Expand All @@ -63,6 +60,7 @@ public void run() {
log.info("Shutdown gracefully");
} finally {
consumer.close();
latch.countDown();
}
},
"consumer-thread"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,10 @@ public Integer call() throws Exception {
Runtime
.getRuntime()
.addShutdownHook(
new Thread("consumer-shutdown-hook") {
@Override
public void run() {
consumer.wakeup();
latch.countDown();
}
}
new Thread(() -> {
consumer.wakeup();
latch.countDown();
}, "consumer-shutdown-hook")
);

// infinite loop
Expand Down Expand Up @@ -93,6 +90,7 @@ public void run() {
log.info("Shutdown gracefully");
} finally {
consumer.close();
latch.countDown();
}
},
"consumer-thread"
Expand Down
Loading

0 comments on commit 50f79e1

Please sign in to comment.