Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to streams-bootstrap v3 #18

Merged
merged 4 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
`java-library`
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
id("com.google.cloud.tools.jib") version "3.4.3"
id("com.bakdata.avro") version "1.4.0"
Expand Down Expand Up @@ -47,12 +47,19 @@ dependencies {
implementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
val streamsBootstrapVersion = "2.23.0"
api(group = "com.bakdata.kafka", name = "streams-bootstrap-large-messages", version = streamsBootstrapVersion)
val streamsBootstrapVersion = "3.0.1"
api(
group = "com.bakdata.kafka",
name = "streams-bootstrap-large-messages",
version = streamsBootstrapVersion
)
implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = streamsBootstrapVersion)
implementation(group = "com.bakdata.kafka", name = "brute-force-serde", version = "1.2.1")
implementation(group = "com.bakdata.kafka", name = "large-message-serde", version = "2.7.0")
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.5.0")
val log4jVersion = "2.23.1"
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)

val junitVersion = "5.10.1"
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
Expand Down
49 changes: 13 additions & 36 deletions src/main/java/com/bakdata/kafka/DeadLetterAnalyzerApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,63 +25,40 @@
package com.bakdata.kafka;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Properties;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

/**
* A Kafka Streams application that analyzes dead letters in your Kafka cluster
*/
@Slf4j
@ToString(callSuper = true)
@Setter
public final class DeadLetterAnalyzerApplication extends LargeMessageKafkaStreamsApplication {

private static final String EXAMPLES_TOPIC_ROLE = "examples";
private static final String STATS_TOPIC_ROLE = "stats";
public final class DeadLetterAnalyzerApplication implements LargeMessageStreamsApp {

public static void main(final String[] args) {
startApplication(new DeadLetterAnalyzerApplication(), args);
KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(DeadLetterAnalyzerApplication::new), args);
}

@Override
public void buildTopology(final StreamsBuilder builder) {
DeadLetterAnalyzerTopology.builder()
.inputPattern(this.getInputPattern())
.outputTopic(this.getOutputTopic())
.statsTopic(this.getStatsTopic())
.examplesTopic(this.getExamplesTopic())
.errorTopic(this.getErrorTopic())
.kafkaProperties(this.getKafkaProperties())
.build()
.buildTopology(builder);
public void buildTopology(final TopologyBuilder topologyBuilder) {
new DeadLetterAnalyzerTopology(topologyBuilder).buildTopology();
}

@Override
public Properties createKafkaProperties() {
final Properties kafkaProperties = super.createKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, LargeMessageSerde.class);
public Map<String, Object> createKafkaProperties() {
final Map<String, Object> kafkaProperties = new HashMap<>();
kafkaProperties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
kafkaProperties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true);
return kafkaProperties;
}

@Override
public String getUniqueAppId() {
return "dead-letter-analyzer-" + this.getOutputTopic();
}

private String getStatsTopic() {
return this.getOutputTopic(STATS_TOPIC_ROLE);
public SerdeConfig defaultSerializationConfig() {
return new SerdeConfig(StringSerde.class, LargeMessageSerde.class);
}

private String getExamplesTopic() {
return this.getOutputTopic(EXAMPLES_TOPIC_ROLE);
@Override
public String getUniqueAppId(final StreamsTopicConfig streamsTopicConfig) {
return "dead-letter-analyzer-" + streamsTopicConfig.getOutputTopic();
}

}
81 changes: 42 additions & 39 deletions src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,11 @@

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
Expand All @@ -51,20 +45,26 @@
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;

@Builder
@Getter
@RequiredArgsConstructor
class DeadLetterAnalyzerTopology {
static final String REPARTITION_NAME = "analyzed";

static final String EXAMPLES_TOPIC_LABEL = "examples";
static final String STATS_TOPIC_LABEL = "stats";
private static final String REPARTITION_NAME = "analyzed";
private static final String STATISTICS_STORE_NAME = "statistics";
private final @NonNull Pattern inputPattern;
private final @NonNull String outputTopic;
private final @NonNull String statsTopic;
private final @NonNull String examplesTopic;
private final @NonNull String errorTopic;
private final @NonNull Properties kafkaProperties;
private final @NonNull TopologyBuilder builder;

static <T extends SpecificRecord> Preconfigured<Serde<T>> getSpecificAvroSerde() {
final Serde<T> serde = new SpecificAvroSerde<>();
return Preconfigured.create(serde);
}

private static Map<String, Object> originals(final Properties properties) {
return new StreamsConfig(properties).originals();
static String getExamplesTopic(final StreamsTopicConfig topics) {
return topics.getOutputTopic(EXAMPLES_TOPIC_LABEL);
}

static String getStatsTopic(final StreamsTopicConfig topics) {
return topics.getOutputTopic(STATS_TOPIC_LABEL);
}

private static String toElasticKey(final ErrorKey key) {
Expand Down Expand Up @@ -95,36 +95,44 @@ private static List<DeadLetter> getDeadLetters(final Object object) {
return object instanceof DeadLetter ? List.of((DeadLetter) object) : List.of();
}

void buildTopology(final StreamsBuilder builder) {
final KStream<Object, DeadLetter> allDeadLetters = this.streamDeadLetters(builder);
private static Preconfigured<Serde<Object>> getInputSerde() {
final Serde<Object> serde = new BruteForceSerde();
return Preconfigured.create(serde);
}

void buildTopology() {
final KStream<Object, DeadLetter> allDeadLetters = this.streamDeadLetters();
final KStream<Object, KeyedDeadLetterWithContext> deadLettersWithContext =
this.enrichWithContext(allDeadLetters);
final StreamsTopicConfig topics = this.builder.getTopics();
deadLettersWithContext
.selectKey((k, v) -> v.extractElasticKey())
.mapValues(KeyedDeadLetterWithContext::format)
.to(this.outputTopic);
.to(topics.getOutputTopic());

final KStream<ErrorKey, Result> aggregated = this.aggregate(deadLettersWithContext);
aggregated
.mapValues((errorKey, result) -> result.toFullErrorStatistics(errorKey))
.selectKey((k, v) -> toElasticKey(k))
.to(this.statsTopic, Produced.valueSerde(this.getSpecificAvroSerde(false)));
.to(getStatsTopic(topics), Produced.valueSerde(this.configureForValues(getSpecificAvroSerde())));
aggregated
.flatMapValues(Result::getExamples)
.mapValues(DeadLetterAnalyzerTopology::toErrorExample)
.selectKey((k, v) -> toElasticKey(k))
.to(this.examplesTopic);
.to(getExamplesTopic(topics));
}

<T extends SpecificRecord> Serde<T> getSpecificAvroSerde(final boolean isKey) {
final Serde<T> serde = new SpecificAvroSerde<>();
serde.configure(new StreamsConfig(this.kafkaProperties).originals(), isKey);
return serde;
private <T> T configureForKeys(final Preconfigured<T> preconfigured) {
return this.builder.createConfigurator().configureForKeys(preconfigured);
}

private <T> T configureForValues(final Preconfigured<T> preconfigured) {
return this.builder.createConfigurator().configureForValues(preconfigured);
}

private KStream<Object, DeadLetter> streamDeadLetters(final StreamsBuilder builder) {
final KStream<Object, Object> rawDeadLetters = builder.stream(this.inputPattern,
Consumed.with(this.getInputSerde(true), this.getInputSerde(false)));
private KStream<Object, DeadLetter> streamDeadLetters() {
final KStream<Object, Object> rawDeadLetters = this.builder.streamInputPattern(
Consumed.with(this.configureForKeys(getInputSerde()), this.configureForValues(getInputSerde())));

final KStream<Object, DeadLetter> streamDeadLetters = rawDeadLetters
.flatMapValues(DeadLetterAnalyzerTopology::getDeadLetters);
Expand All @@ -143,20 +151,14 @@ private KStream<Object, DeadLetter> streamDeadLetters(final StreamsBuilder build
.merge(streamHeaderDeadLetters);
}

private Serde<Object> getInputSerde(final boolean isKey) {
final Serde<Object> serde = new BruteForceSerde();
serde.configure(originals(this.kafkaProperties), isKey);
return serde;
}

private <K> void toDeadLetterTopic(final KStream<K, DeadLetter> connectDeadLetters) {
connectDeadLetters
.selectKey((k, v) -> ErrorUtil.toString(k))
.to(this.errorTopic);
.to(this.builder.getTopics().getErrorTopic());
}

private KStream<ErrorKey, Result> aggregate(final KStream<?, KeyedDeadLetterWithContext> withContext) {
final Serde<ErrorKey> errorKeySerde = this.getSpecificAvroSerde(true);
final Serde<ErrorKey> errorKeySerde = this.configureForKeys(getSpecificAvroSerde());
final StoreBuilder<KeyValueStore<ErrorKey, ErrorStatistics>> statisticsStore =
this.createStatisticsStore(errorKeySerde);

Expand Down Expand Up @@ -190,7 +192,8 @@ public Set<StoreBuilder<?>> stores() {
private StoreBuilder<KeyValueStore<ErrorKey, ErrorStatistics>> createStatisticsStore(
final Serde<ErrorKey> errorKeySerde) {
final KeyValueBytesStoreSupplier statisticsStoreSupplier = Stores.inMemoryKeyValueStore(STATISTICS_STORE_NAME);
return Stores.keyValueStoreBuilder(statisticsStoreSupplier, errorKeySerde, this.getSpecificAvroSerde(false));
return Stores.keyValueStoreBuilder(statisticsStoreSupplier, errorKeySerde,
this.configureForValues(getSpecificAvroSerde()));
}

private <K> KStream<K, KeyedDeadLetterWithContext> enrichWithContext(
Expand Down
Loading
Loading