Skip to content

Commit

Permalink
Upgrade to streams-bootstrap v3 (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Aug 21, 2024
1 parent 9dadafa commit 1556ca4
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 111 deletions.
13 changes: 10 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
`java-library`
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
id("com.google.cloud.tools.jib") version "3.4.3"
id("com.bakdata.avro") version "1.4.0"
Expand Down Expand Up @@ -47,12 +47,19 @@ dependencies {
implementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
val streamsBootstrapVersion = "2.23.0"
api(group = "com.bakdata.kafka", name = "streams-bootstrap-large-messages", version = streamsBootstrapVersion)
val streamsBootstrapVersion = "3.0.1"
api(
group = "com.bakdata.kafka",
name = "streams-bootstrap-large-messages",
version = streamsBootstrapVersion
)
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = streamsBootstrapVersion)
implementation(group = "com.bakdata.kafka", name = "brute-force-serde", version = "1.2.1")
implementation(group = "com.bakdata.kafka", name = "large-message-serde", version = "2.7.0")
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.5.0")
val log4jVersion = "2.23.1"
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)

val junitVersion = "5.10.1"
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
Expand Down
49 changes: 13 additions & 36 deletions src/main/java/com/bakdata/kafka/DeadLetterAnalyzerApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,63 +25,40 @@
package com.bakdata.kafka;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Properties;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

/**
* A Kafka Streams application that analyzes dead letters in your Kafka cluster
*/
@Slf4j
@ToString(callSuper = true)
@Setter
public final class DeadLetterAnalyzerApplication extends LargeMessageKafkaStreamsApplication {

private static final String EXAMPLES_TOPIC_ROLE = "examples";
private static final String STATS_TOPIC_ROLE = "stats";
public final class DeadLetterAnalyzerApplication implements LargeMessageStreamsApp {

public static void main(final String[] args) {
startApplication(new DeadLetterAnalyzerApplication(), args);
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(DeadLetterAnalyzerApplication::new), args);
}

@Override
public void buildTopology(final StreamsBuilder builder) {
DeadLetterAnalyzerTopology.builder()
.inputPattern(this.getInputPattern())
.outputTopic(this.getOutputTopic())
.statsTopic(this.getStatsTopic())
.examplesTopic(this.getExamplesTopic())
.errorTopic(this.getErrorTopic())
.kafkaProperties(this.getKafkaProperties())
.build()
.buildTopology(builder);
public void buildTopology(final TopologyBuilder topologyBuilder) {
new DeadLetterAnalyzerTopology(topologyBuilder).buildTopology();
}

@Override
public Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, LargeMessageSerde.class);
public Map<String, Object> createKafkaProperties() {
final Map<String, Object> kafkaProperties = new HashMap<>();
kafkaProperties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
kafkaProperties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true);
return kafkaProperties;
}

@Override
public String getUniqueAppId() {
return "dead-letter-analyzer-" + this.getOutputTopic();
}

private String getStatsTopic() {
return this.getOutputTopic(STATS_TOPIC_ROLE);
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, LargeMessageSerde.class);
}

private String getExamplesTopic() {
return this.getOutputTopic(EXAMPLES_TOPIC_ROLE);
@Override
public String getUniqueAppId(final StreamsTopicConfig streamsTopicConfig) {
return "dead-letter-analyzer-" + streamsTopicConfig.getOutputTopic();
}

}
81 changes: 42 additions & 39 deletions src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,11 @@

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
Expand All @@ -51,20 +45,26 @@
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

@Builder
@Getter
@RequiredArgsConstructor
class DeadLetterAnalyzerTopology {
static final String REPARTITION_NAME = "analyzed";

static final String EXAMPLES_TOPIC_LABEL = "examples";
static final String STATS_TOPIC_LABEL = "stats";
private static final String REPARTITION_NAME = "analyzed";
private static final String STATISTICS_STORE_NAME = "statistics";
private final @NonNull Pattern inputPattern;
private final @NonNull String outputTopic;
private final @NonNull String statsTopic;
private final @NonNull String examplesTopic;
private final @NonNull String errorTopic;
private final @NonNull Properties kafkaProperties;
private final @NonNull TopologyBuilder builder;

static <T extends SpecificRecord> Preconfigured<Serde<T>> getSpecificAvroSerde() {
final Serde<T> serde = new SpecificAvroSerde<>();
return Preconfigured.create(serde);
}

private static Map<String, Object> originals(final Properties properties) {
return new StreamsConfig(properties).originals();
static String getExamplesTopic(final StreamsTopicConfig topics) {
return topics.getOutputTopic(EXAMPLES_TOPIC_LABEL);
}

static String getStatsTopic(final StreamsTopicConfig topics) {
return topics.getOutputTopic(STATS_TOPIC_LABEL);
}

private static String toElasticKey(final ErrorKey key) {
Expand Down Expand Up @@ -95,36 +95,44 @@ private static List<DeadLetter> getDeadLetters(final Object object) {
return object instanceof DeadLetter ? List.of((DeadLetter) object) : List.of();
}

void buildTopology(final StreamsBuilder builder) {
final KStream<Object, DeadLetter> allDeadLetters = this.streamDeadLetters(builder);
private static Preconfigured<Serde<Object>> getInputSerde() {
final Serde<Object> serde = new BruteForceSerde();
return Preconfigured.create(serde);
}

void buildTopology() {
final KStream<Object, DeadLetter> allDeadLetters = this.streamDeadLetters();
final KStream<Object, KeyedDeadLetterWithContext> deadLettersWithContext =
this.enrichWithContext(allDeadLetters);
final StreamsTopicConfig topics = this.builder.getTopics();
deadLettersWithContext
.selectKey((k, v) -> v.extractElasticKey())
.mapValues(KeyedDeadLetterWithContext::format)
.to(this.outputTopic);
.to(topics.getOutputTopic());

final KStream<ErrorKey, Result> aggregated = this.aggregate(deadLettersWithContext);
aggregated
.mapValues((errorKey, result) -> result.toFullErrorStatistics(errorKey))
.selectKey((k, v) -> toElasticKey(k))
.to(this.statsTopic, Produced.valueSerde(this.getSpecificAvroSerde(false)));
.to(getStatsTopic(topics), Produced.valueSerde(this.configureForValues(getSpecificAvroSerde())));
aggregated
.flatMapValues(Result::getExamples)
.mapValues(DeadLetterAnalyzerTopology::toErrorExample)
.selectKey((k, v) -> toElasticKey(k))
.to(this.examplesTopic);
.to(getExamplesTopic(topics));
}

<T extends SpecificRecord> Serde<T> getSpecificAvroSerde(final boolean isKey) {
final Serde<T> serde = new SpecificAvroSerde<>();
serde.configure(new StreamsConfig(this.kafkaProperties).originals(), isKey);
return serde;
private <T> T configureForKeys(final Preconfigured<T> preconfigured) {
return this.builder.createConfigurator().configureForKeys(preconfigured);
}

private <T> T configureForValues(final Preconfigured<T> preconfigured) {
return this.builder.createConfigurator().configureForValues(preconfigured);
}

private KStream<Object, DeadLetter> streamDeadLetters(final StreamsBuilder builder) {
final KStream<Object, Object> rawDeadLetters = builder.stream(this.inputPattern,
Consumed.with(this.getInputSerde(true), this.getInputSerde(false)));
private KStream<Object, DeadLetter> streamDeadLetters() {
final KStream<Object, Object> rawDeadLetters = this.builder.streamInputPattern(
Consumed.with(this.configureForKeys(getInputSerde()), this.configureForValues(getInputSerde())));

final KStream<Object, DeadLetter> streamDeadLetters = rawDeadLetters
.flatMapValues(DeadLetterAnalyzerTopology::getDeadLetters);
Expand All @@ -143,20 +151,14 @@ private KStream<Object, DeadLetter> streamDeadLetters(final StreamsBuilder build
.merge(streamHeaderDeadLetters);
}

private Serde<Object> getInputSerde(final boolean isKey) {
final Serde<Object> serde = new BruteForceSerde();
serde.configure(originals(this.kafkaProperties), isKey);
return serde;
}

private <K> void toDeadLetterTopic(final KStream<K, DeadLetter> connectDeadLetters) {
connectDeadLetters
.selectKey((k, v) -> ErrorUtil.toString(k))
.to(this.errorTopic);
.to(this.builder.getTopics().getErrorTopic());
}

private KStream<ErrorKey, Result> aggregate(final KStream<?, KeyedDeadLetterWithContext> withContext) {
final Serde<ErrorKey> errorKeySerde = this.getSpecificAvroSerde(true);
final Serde<ErrorKey> errorKeySerde = this.configureForKeys(getSpecificAvroSerde());
final StoreBuilder<KeyValueStore<ErrorKey, ErrorStatistics>> statisticsStore =
this.createStatisticsStore(errorKeySerde);

Expand Down Expand Up @@ -190,7 +192,8 @@ public Set<StoreBuilder<?>> stores() {
private StoreBuilder<KeyValueStore<ErrorKey, ErrorStatistics>> createStatisticsStore(
final Serde<ErrorKey> errorKeySerde) {
final KeyValueBytesStoreSupplier statisticsStoreSupplier = Stores.inMemoryKeyValueStore(STATISTICS_STORE_NAME);
return Stores.keyValueStoreBuilder(statisticsStoreSupplier, errorKeySerde, this.getSpecificAvroSerde(false));
return Stores.keyValueStoreBuilder(statisticsStoreSupplier, errorKeySerde,
this.configureForValues(getSpecificAvroSerde()));
}

private <K> KStream<K, KeyedDeadLetterWithContext> enrichWithContext(
Expand Down
Loading

0 comments on commit 1556ca4

Please sign in to comment.