Swiftly build and enhance your Kafka Streams applications.
Kstreamplify adds extra features to Kafka Streams, simplifying development so you can write applications with minimal effort and stay focused on business implementation.
- Overview
- Dependencies
- Bootstrapping
- Avro Serializer and Deserializer
- Error Handling
- Web Services
- TopicWithSerde API
- Interactive Queries
- Hooks
- Deduplication
- Open Telemetry
- Swagger
- Testing
- Motivation
- Contribution
Wondering what makes Kstreamplify stand out? Here are some of the key features that make it a must-have for Kafka Streams:
-
🚀 Bootstrapping: Automatic startup, configuration, and initialization of Kafka Streams is handled for you. Focus on business implementation rather than the setup.
-
📝 Avro Serializer and Deserializer: Common serializers and deserializers for Avro.
-
⛑️ Error Handling: Catch and route errors to a dead-letter queue (DLQ) topic.
-
☸️ Kubernetes: Accurate readiness and liveness probes for Kubernetes deployment.
-
🤿 Interactive Queries: Dive into Kafka Streams state stores.
-
🫧 Deduplication: Remove duplicate events from a stream.
-
🧪 Testing: Automatic Topology Test Driver setup. Start writing your tests with minimal effort.
Kstreamplify offers three dependencies, all compatible with Java 17 and 21.
To include the core Kstreamplify library in your project, add the following dependency:
<dependency>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-core</artifactId>
<version>${kstreamplify.version}</version>
</dependency>
For Spring Boot applications, use the following dependency:
<dependency>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-spring-boot</artifactId>
<version>${kstreamplify.version}</version>
</dependency>
The dependency is compatible with Spring Boot 3.
For both Java and Spring Boot dependencies, a testing dependency is available to facilitate testing:
<dependency>
<groupId>com.michelin</groupId>
<artifactId>kstreamplify-core-test</artifactId>
<version>${kstreamplify.version}</version>
<scope>test</scope>
</dependency>
Kstreamplify simplifies the bootstrapping of Kafka Streams applications by handling the startup, configuration, and initialization of Kafka Streams for you.
Define a KafkaStreamsStarter
bean within your Spring Boot context and override the KafkaStreamsStarter#topology()
method:
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
// Your topology
}
@Override
public String dlqTopic() {
return "DLQ_TOPIC";
}
}
You can define all your Kafka Streams properties directly from the application.yml
file as follows:
kafka:
properties:
acks: 'all'
application.id: 'myKafkaStreams'
auto.offset.reset: 'earliest'
avro.remove.java.properties: true
bootstrap.servers: 'localhost:9092'
client.id: 'myKafkaStreams'
schema.registry.url: 'http://localhost:8081'
state.dir: '/tmp/my-kafka-streams'
Note that all the Kafka Streams properties are prefixed with kafka.properties
.
Whenever you need to serialize or deserialize records with Avro schemas, you can use the SerdesUtils
class as follows:
SerdesUtils.<MyAvroValue>getValueSerde()
or
SerdesUtils.<MyAvroValue>getKeySerde()
Here is an example of using these methods in your topology:
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
streamsBuilder
.stream("INPUT_TOPIC", Consumed.with(Serdes.String(), SerdesUtils.<KafkaPerson>getValueSerde()))
.to("OUTPUT_TOPIC", Produced.with(Serdes.String(), SerdesUtils.<KafkaPerson>getValueSerde()));
}
}
Kstreamplify provides the ability to handle errors and route them to a dead-letter queue (DLQ) topic.
Override the dlqTopic
method and return the name of your DLQ topic:
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
}
@Override
public String dlqTopic() {
return "DLQ_TOPIC";
}
}
To handle processing errors and route them to the DLQ topic, you can use the ProcessingResult
class.
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> stream = streamsBuilder
.stream("INPUT_TOPIC", Consumed.with(Serdes.String(), SerdesUtils.getValueSerde()));
TopologyErrorHandler
.catchErrors(stream.mapValues(MyKafkaStreams::toUpperCase))
.to("OUTPUT_TOPIC", Produced.with(Serdes.String(), SerdesUtils.getValueSerde()));
}
@Override
public String dlqTopic() {
return "DLQ_TOPIC";
}
private static ProcessingResult<KafkaPerson, KafkaPerson> toUpperCase(KafkaPerson value) {
try {
value.setLastName(value.getLastName().toUpperCase());
return ProcessingResult.success(value);
} catch (Exception e) {
return ProcessingResult.fail(e, value, "Something bad happened...");
}
}
}
The map values processing returns a ProcessingResult<V, V2>
, where:
- The first parameter is the type of the new value after a successful transformation.
- The second parameter is the type of the current value for which the transformation failed.
Use the following to mark the result as successful:
ProcessingResult.success(value);
Or the following in a catch clause to mark the result as failed:
ProcessingResult.fail(e, value, "Something bad happened...");
Invoke the TopologyErrorHandler#catchErrors()
by passing the stream to route the failed records to the DLQ topic.
A healthy stream is then returned and can be further processed.
Production and deserialization handlers are provided to send errors to the DLQ topic.
Add the following properties to your application.yml
file:
kafka:
properties:
default.deserialization.exception.handler: 'com.michelin.kstreamplify.error.DlqDeserializationExceptionHandler'
default.production.exception.handler: 'com.michelin.kstreamplify.error.DlqProductionExceptionHandler'
An Avro schema needs to be deployed in a Schema Registry on top of the DLQ topic. It is available here.
The default uncaught exception handler catches all uncaught exceptions and shuts down the client.
To change this behaviour, override the KafkaStreamsStarter#uncaughtExceptionHandler()
method and return your own
uncaught exception handler.
@Override
public StreamsUncaughtExceptionHandler uncaughtExceptionHandler() {
return throwable -> {
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
};
}
Kstreamplify provides web services on top of your Kafka Streams application. They are available through the Swagger UI.
The topology endpoint return the Kafka Streams topology description. It is available at /topology
by default.
The path can be customized by setting the following property:
topology:
path: 'custom-topology'
A list of endpoints to query the state stores of your Kafka Streams application is available. It uses interactive queries and handle state stores being on different Kafka Streams instances by providing an RPC layer.
Here is the list of supported state store types:
- Key-Value store
- Window store
Only state stores with String keys are supported.
Readiness and liveness endpoints provide probes for Kubernetes deployment based on the Kafka Streams state.
They are available at /ready
and /liveness
by default.
The path can be customized by setting the following properties:
kubernetes:
liveness:
path: 'custom-liveness'
readiness:
path: 'custom-readiness'
Kstreamplify provides an API called TopicWithSerde
that unifies all the consumption and production points and
deals with topics being owned by different teams across different environments.
Declare your consumption and production points in a separate class. It requires a topic name, a key SerDe, and a value SerDe.
public static TopicWithSerde<String, KafkaPerson> inputTopic() {
return new TopicWithSerde<>(
"INPUT_TOPIC",
Serdes.String(),
SerdesUtils.getValueSerdes()
);
}
public static TopicWithSerde<String, KafkaPerson> outputTopic() {
return new TopicWithSerde<>(
"OUTPUT_TOPIC",
Serdes.String(),
SerdesUtils.getValueSerdes()
);
}
Use it in your topology:
@Slf4j
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> stream = inputTopic().stream(streamsBuilder);
outputTopic().produce(stream);
}
}
The TopicWithSerde
API power is to handle topics owned by different teams across different environments without altering the topology.
It uses prefixes to differentiate teams and topic ownership.
In the application.yml
file, declare the prefixes in a key: value
format:
kafka:
properties:
prefix:
self: 'staging.team1.'
team2: 'staging.team2.'
team3: 'staging.team3.'
Include the prefix TopicWithSerde
declaration:
public static TopicWithSerde<String, KafkaPerson> inputTopic() {
return new TopicWithSerde<>(
"INPUT_TOPIC",
"team1",
Serdes.String(),
SerdesUtils.getValueSerdes()
);
}
The topic
staging.team1.INPUT_TOPIC
will be consumed when running the application with the stagingapplication.yml
file.
When not specifying a prefix, self
is used by default.
Kstreamplify encourages the use of fixed topic names in the topology and uses the prefix feature to manage namespacing for virtual clusters and permissions. However, there are many situations where you might want to reuse the same topology but with a different set of input or output topics.
In the application.yml
file, declare dynamic remappings in a key: value
format:
kafka:
properties:
topic:
remap:
oldTopicName: newTopicName
foo: bar
The topic
oldTopicName
in the topology will be mapped tonewTopicName
.
This feature is compatible with both input and output topics.
Kstreamplify wants to ease the use of interactive queries in Kafka Streams application.
The "application.server" property value is determined from different sources by the following order of priority:
- The value of an environment variable whose name is defined by the
application.server.var.name
property.
kafka:
properties:
application.server.var.name: 'MY_APPLICATION_SERVER'
- The value of a default environment variable named
APPLICATION_SERVER
. localhost
.
You can leverage the interactive queries services used by the web services layer to serve your own needs.
@Component
public class MyService {
@Autowired
KeyValueStoreService keyValueStoreService;
@Autowired
WindowStoreService windowStoreService;
}
Kstreamplify offers the flexibility to execute custom code through hooks.
The On Start
hook allows you to execute code before starting the Kafka Streams instance.
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void onStart(KafkaStreams kafkaStreams) {
// Do something before starting the Kafka Streams instance
}
}
Kstreamplify facilitates deduplication of a stream through the DeduplicationUtils
class, based on various criteria
and within a specified time frame.
All deduplication methods return a KStream<String, ProcessingResult<V,V2>
so you can redirect the result to the
TopologyErrorHandler#catchErrors()
.
Note: Only streams with String keys and Avro values are supported.
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> myStream = streamsBuilder
.stream("INPUT_TOPIC");
DeduplicationUtils
.deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60))
.to("OUTPUT_TOPIC");
}
}
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> myStream = streamsBuilder
.stream("INPUT_TOPIC");
DeduplicationUtils
.deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60))
.to("OUTPUT_TOPIC");
}
}
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaPerson> myStream = streamsBuilder
.stream("INPUT_TOPIC");
DeduplicationUtils
.deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60),
value -> value.getFirstName() + "#" + value.getLastName())
.to("OUTPUT_TOPIC");
}
}
The given predicate will be used as a key in the window store. The stream will be deduplicated based on the predicate.
The Kstreamplify Spring Boot module simplifies the integration of Open Telemetry and its Java agent in Kafka Streams applications by binding all Kafka Streams metrics to the Spring Boot registry.
You can run your application with the Open Telemetry Java agent by including the following JVM options:
-javaagent:/opentelemetry-javaagent.jar -Dotel.traces.exporter=otlp -Dotel.logs.exporter=otlp -Dotel.metrics.exporter=otlp
It also facilitates the addition of custom tags to the metrics, allowing you to use them to organize your metrics in your Grafana dashboard.
-Dotel.resource.attributes=environment=production,service.namespace=myNamespace,service.name=myKafkaStreams,category=orders
All the tags specified in the otel.resource.attributes
property will be included in the metrics and can be observed in
the logs during the application startup.
The Kstreamplify Spring Boot dependency uses Springdoc to generate an API documentation for Kafka Streams.
By default:
- The Swagger UI page is available at
http://host:port/swagger-ui/index.html
. - The OpenAPI description is available at
http://host:port/v3/api-docs
.
Both can be customized by using the Springdoc properties.
Kstreamplify eases the use of the Topology Test Driver for testing Kafka Streams application.
You can create a test class that extends KafkaStreamsStarterTest
, override
the KafkaStreamsStarterTest#getKafkaStreamsStarter()
to provide your KafkaStreamsStarter
implementation,
and start writing your tests.
public class MyKafkaStreamsTest extends KafkaStreamsStarterTest {
private TestInputTopic<String, KafkaPerson> inputTopic;
private TestOutputTopic<String, KafkaPerson> outputTopic;
@Override
protected KafkaStreamsStarter getKafkaStreamsStarter() {
return new MyKafkaStreams();
}
@BeforeEach
void setUp() {
inputTopic = testDriver.createInputTopic("INPUT_TOPIC", new StringSerializer(),
SerdesUtils.<KafkaPerson>getValueSerde().serializer());
outputTopic = testDriver.createOutputTopic("OUTPUT_TOPIC", new StringDeserializer(),
SerdesUtils.<KafkaPerson>getValueSerde().deserializer());
}
@Test
void shouldUpperCase() {
inputTopic.pipeInput("1", person);
List<KeyValue<String, KafkaPerson>> results = outputTopic.readKeyValuesToList();
assertThat(results.get(0).value.getFirstName()).isEqualTo("FIRST NAME");
assertThat(results.get(0).value.getLastName()).isEqualTo("LAST NAME");
}
@Test
void shouldFailAndRouteToDlqTopic() {
inputTopic.pipeInput("1", person);
List<KeyValue<String, KafkaError>> errors = dlqTopic.readKeyValuesToList();
assertThat(errors.get(0).key).isEqualTo("1");
assertThat(errors.get(0).value.getContextMessage()).isEqualTo("Something bad happened...");
assertThat(errors.get(0).value.getOffset()).isZero();
}
}
Kstreamplify runs the tests with default properties. It is possible to provide additional properties or override the default ones:
public class MyKafkaStreamsTest extends KafkaStreamsStarterTest {
@Override
protected Map<String, String> getSpecificProperties() {
return Map.of(
STATE_DIR_CONFIG, "/tmp/kafka-streams"
);
}
}
Developing applications with Kafka Streams can be challenging and often raises many questions for developers. It involves considerations such as efficient bootstrapping of Kafka Streams applications, handling unexpected business issues, and integrating Kubernetes probes, among others.
To assist developers in overcoming these challenges, we have built this library. Our aim is to provide a comprehensive solution that simplifies the development process and addresses common pain points encountered while working with Kafka Streams.
We welcome contributions from the community! Before you get started, please take a look at our contribution guide to learn about our guidelines and best practices. We appreciate your help in making Kstreamplify a better library for everyone.