diff --git a/build.gradle.kts b/build.gradle.kts index 42eb5be..6020384 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,7 +4,7 @@ plugins { `java-library` id("com.bakdata.release") version "1.4.0" id("com.bakdata.sonar") version "1.4.0" - id("com.bakdata.sonatype") version "1.4.0" + id("com.bakdata.sonatype") version "1.4.1" id("io.freefair.lombok") version "8.4" id("com.google.cloud.tools.jib") version "3.4.3" id("com.bakdata.avro") version "1.4.0" @@ -47,12 +47,19 @@ dependencies { implementation(group = "org.apache.kafka", name = "connect-runtime", version = kafkaVersion) { exclude(group = "org.slf4j", module = "slf4j-log4j12") } - val streamsBootstrapVersion = "2.23.0" - api(group = "com.bakdata.kafka", name = "streams-bootstrap-large-messages", version = streamsBootstrapVersion) + val streamsBootstrapVersion = "3.0.1" + api( + group = "com.bakdata.kafka", + name = "streams-bootstrap-large-messages", + version = streamsBootstrapVersion + ) + implementation(group = "com.bakdata.kafka", name = "streams-bootstrap-cli", version = streamsBootstrapVersion) implementation(group = "com.bakdata.kafka", name = "brute-force-serde", version = "1.2.1") implementation(group = "com.bakdata.kafka", name = "large-message-serde", version = "2.7.0") implementation(group = "org.jooq", name = "jool", version = "0.9.14") avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.5.0") + val log4jVersion = "2.23.1" + implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion) val junitVersion = "5.10.1" testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) diff --git a/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerApplication.java b/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerApplication.java index 43511a2..87f3a8c 100644 --- a/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerApplication.java +++ b/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerApplication.java @@ -25,63 +25,40 @@ package com.bakdata.kafka; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; -import java.util.Properties; -import lombok.Setter; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Map; import org.apache.kafka.common.serialization.Serdes.StringSerde; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; /** * A Kafka Streams application that analyzes dead letters in your Kafka cluster */ -@Slf4j -@ToString(callSuper = true) -@Setter -public final class DeadLetterAnalyzerApplication extends LargeMessageKafkaStreamsApplication { - - private static final String EXAMPLES_TOPIC_ROLE = "examples"; - private static final String STATS_TOPIC_ROLE = "stats"; +public final class DeadLetterAnalyzerApplication implements LargeMessageStreamsApp { public static void main(final String[] args) { - startApplication(new DeadLetterAnalyzerApplication(), args); + KafkaApplication.startApplication(new SimpleKafkaStreamsApplication(DeadLetterAnalyzerApplication::new), args); } @Override - public void buildTopology(final StreamsBuilder builder) { - DeadLetterAnalyzerTopology.builder() - .inputPattern(this.getInputPattern()) - .outputTopic(this.getOutputTopic()) - .statsTopic(this.getStatsTopic()) - .examplesTopic(this.getExamplesTopic()) - .errorTopic(this.getErrorTopic()) - .kafkaProperties(this.getKafkaProperties()) - .build() - .buildTopology(builder); + public void buildTopology(final TopologyBuilder topologyBuilder) { + new DeadLetterAnalyzerTopology(topologyBuilder).buildTopology(); } @Override - public Properties createKafkaProperties() { - final Properties kafkaProperties = super.createKafkaProperties(); - kafkaProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class); - kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, LargeMessageSerde.class); + public Map createKafkaProperties() { + final Map kafkaProperties = new HashMap<>(); kafkaProperties.put(LargeMessageSerdeConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); kafkaProperties.put(AbstractLargeMessageConfig.USE_HEADERS_CONFIG, true); return kafkaProperties; } @Override - public String getUniqueAppId() { - return "dead-letter-analyzer-" + this.getOutputTopic(); - } - - private String getStatsTopic() { - return this.getOutputTopic(STATS_TOPIC_ROLE); + public SerdeConfig defaultSerializationConfig() { + return new SerdeConfig(StringSerde.class, LargeMessageSerde.class); } - private String getExamplesTopic() { - return this.getOutputTopic(EXAMPLES_TOPIC_ROLE); + @Override + public String getUniqueAppId(final StreamsTopicConfig streamsTopicConfig) { + return "dead-letter-analyzer-" + streamsTopicConfig.getOutputTopic(); } } diff --git a/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java b/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java index 9ae4772..fcbea9f 100644 --- a/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java +++ b/src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java @@ -29,17 +29,11 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import java.util.List; -import java.util.Map; -import java.util.Properties; import java.util.Set; -import java.util.regex.Pattern; -import lombok.Builder; -import lombok.Getter; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; @@ -51,20 +45,26 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -@Builder -@Getter +@RequiredArgsConstructor class DeadLetterAnalyzerTopology { - static final String REPARTITION_NAME = "analyzed"; + + static final String EXAMPLES_TOPIC_LABEL = "examples"; + static final String STATS_TOPIC_LABEL = "stats"; + private static final String REPARTITION_NAME = "analyzed"; private static final String STATISTICS_STORE_NAME = "statistics"; - private final @NonNull Pattern inputPattern; - private final @NonNull String outputTopic; - private final @NonNull String statsTopic; - private final @NonNull String examplesTopic; - private final @NonNull String errorTopic; - private final @NonNull Properties kafkaProperties; + private final @NonNull TopologyBuilder builder; + + static Preconfigured> getSpecificAvroSerde() { + final Serde serde = new SpecificAvroSerde<>(); + return Preconfigured.create(serde); + } - private static Map originals(final Properties properties) { - return new StreamsConfig(properties).originals(); + static String getExamplesTopic(final StreamsTopicConfig topics) { + return topics.getOutputTopic(EXAMPLES_TOPIC_LABEL); + } + + static String getStatsTopic(final StreamsTopicConfig topics) { + return topics.getOutputTopic(STATS_TOPIC_LABEL); } private static String toElasticKey(final ErrorKey key) { @@ -95,36 +95,44 @@ private static List getDeadLetters(final Object object) { return object instanceof DeadLetter ? List.of((DeadLetter) object) : List.of(); } - void buildTopology(final StreamsBuilder builder) { - final KStream allDeadLetters = this.streamDeadLetters(builder); + private static Preconfigured> getInputSerde() { + final Serde serde = new BruteForceSerde(); + return Preconfigured.create(serde); + } + + void buildTopology() { + final KStream allDeadLetters = this.streamDeadLetters(); final KStream deadLettersWithContext = this.enrichWithContext(allDeadLetters); + final StreamsTopicConfig topics = this.builder.getTopics(); deadLettersWithContext .selectKey((k, v) -> v.extractElasticKey()) .mapValues(KeyedDeadLetterWithContext::format) - .to(this.outputTopic); + .to(topics.getOutputTopic()); final KStream aggregated = this.aggregate(deadLettersWithContext); aggregated .mapValues((errorKey, result) -> result.toFullErrorStatistics(errorKey)) .selectKey((k, v) -> toElasticKey(k)) - .to(this.statsTopic, Produced.valueSerde(this.getSpecificAvroSerde(false))); + .to(getStatsTopic(topics), Produced.valueSerde(this.configureForValues(getSpecificAvroSerde()))); aggregated .flatMapValues(Result::getExamples) .mapValues(DeadLetterAnalyzerTopology::toErrorExample) .selectKey((k, v) -> toElasticKey(k)) - .to(this.examplesTopic); + .to(getExamplesTopic(topics)); } - Serde getSpecificAvroSerde(final boolean isKey) { - final Serde serde = new SpecificAvroSerde<>(); - serde.configure(new StreamsConfig(this.kafkaProperties).originals(), isKey); - return serde; + private T configureForKeys(final Preconfigured preconfigured) { + return this.builder.createConfigurator().configureForKeys(preconfigured); + } + + private T configureForValues(final Preconfigured preconfigured) { + return this.builder.createConfigurator().configureForValues(preconfigured); } - private KStream streamDeadLetters(final StreamsBuilder builder) { - final KStream rawDeadLetters = builder.stream(this.inputPattern, - Consumed.with(this.getInputSerde(true), this.getInputSerde(false))); + private KStream streamDeadLetters() { + final KStream rawDeadLetters = this.builder.streamInputPattern( + Consumed.with(this.configureForKeys(getInputSerde()), this.configureForValues(getInputSerde()))); final KStream streamDeadLetters = rawDeadLetters .flatMapValues(DeadLetterAnalyzerTopology::getDeadLetters); @@ -143,20 +151,14 @@ private KStream streamDeadLetters(final StreamsBuilder build .merge(streamHeaderDeadLetters); } - private Serde getInputSerde(final boolean isKey) { - final Serde serde = new BruteForceSerde(); - serde.configure(originals(this.kafkaProperties), isKey); - return serde; - } - private void toDeadLetterTopic(final KStream connectDeadLetters) { connectDeadLetters .selectKey((k, v) -> ErrorUtil.toString(k)) - .to(this.errorTopic); + .to(this.builder.getTopics().getErrorTopic()); } private KStream aggregate(final KStream withContext) { - final Serde errorKeySerde = this.getSpecificAvroSerde(true); + final Serde errorKeySerde = this.configureForKeys(getSpecificAvroSerde()); final StoreBuilder> statisticsStore = this.createStatisticsStore(errorKeySerde); @@ -190,7 +192,8 @@ public Set> stores() { private StoreBuilder> createStatisticsStore( final Serde errorKeySerde) { final KeyValueBytesStoreSupplier statisticsStoreSupplier = Stores.inMemoryKeyValueStore(STATISTICS_STORE_NAME); - return Stores.keyValueStoreBuilder(statisticsStoreSupplier, errorKeySerde, this.getSpecificAvroSerde(false)); + return Stores.keyValueStoreBuilder(statisticsStoreSupplier, errorKeySerde, + this.configureForValues(getSpecificAvroSerde())); } private KStream enrichWithContext( diff --git a/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java b/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java index 7020072..0bf048f 100644 --- a/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java +++ b/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java @@ -25,6 +25,8 @@ package com.bakdata.kafka; import static com.bakdata.kafka.ConnectDeadLetterParserTest.toBytes; +import static com.bakdata.kafka.DeadLetterAnalyzerTopology.EXAMPLES_TOPIC_LABEL; +import static com.bakdata.kafka.DeadLetterAnalyzerTopology.STATS_TOPIC_LABEL; import static com.bakdata.kafka.ErrorHeaderProcessor.DESCRIPTION; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; @@ -34,6 +36,7 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; import static com.bakdata.kafka.Formatter.DATE_TIME_FORMATTER; import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.getHeaderName; +import static com.bakdata.kafka.TestTopologyFactory.createTopologyExtensionWithSchemaRegistry; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE; @@ -52,7 +55,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.Properties; +import java.util.Map; import java.util.regex.Pattern; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -60,9 +63,6 @@ import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.Topology; import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; @@ -73,15 +73,26 @@ @ExtendWith(SoftAssertionsExtension.class) class DeadLetterAnalyzerTopologyTest { - private DeadLetterAnalyzerTopology app; + private static final StreamsTopicConfig TOPIC_CONFIG = StreamsTopicConfig.builder() + .inputPattern(Pattern.compile(".*-dead-letter-topic")) + .outputTopic("output") + .errorTopic("analyzer-stream-dead-letter-topic") + .labeledOutputTopics( + Map.of( + EXAMPLES_TOPIC_LABEL, "examples", + STATS_TOPIC_LABEL, "stats" + ) + ) + .build(); @RegisterExtension - TestTopologyExtension topology = - new TestTopologyExtension<>(this::createTopology, - StreamsBootstrapTopologyFactory.getKafkaPropertiesWithSchemaRegistryUrl( - new DeadLetterAnalyzerApplication())); + TestTopologyExtension topology = createTopologyExtensionWithSchemaRegistry(createApp()); @InjectSoftAssertions private SoftAssertions softly; + private static ConfiguredStreamsApp createApp() { + return new ConfiguredStreamsApp<>(new DeadLetterAnalyzerApplication(), new AppConfiguration<>(TOPIC_CONFIG)); + } + private static LocalDateTime parseDateTime(final String dateTime) { return LocalDateTime.parse(dateTime, DATE_TIME_FORMATTER); } @@ -388,7 +399,7 @@ void shouldProcessConnectErrors() { .setStackTrace(StackTraceClassifierTest.STACK_TRACE) .build()) .setDescription("Error in stage VALUE_CONVERTER (org.apache.kafka.connect.json.JsonConverter) in " - + "my-connector[2]") + + "my-connector[2]") .setPartition(1) .setTopic("my-topic") .setOffset(10L) @@ -527,7 +538,7 @@ void shouldProcessStreamsHeaderErrors() { @Test void shouldReadAvroKey() { final TestInput input = - this.getStreamsInput(this.app.getSpecificAvroSerde(true)); + this.getStreamsInput(this.configureForKeys(DeadLetterAnalyzerTopology.getSpecificAvroSerde())); final TestOutput processedDeadLetters = this.getProcessedDeadLetters(); final TestOutput statistics = this.getStatistics(); @@ -566,51 +577,49 @@ void shouldReadAvroKey() { this.assertNoDeadLetters(); } + private Configurator getConfigurator() { + return new Configurator(this.topology.getProperties()); + } + private void assertNoDeadLetters() { final Iterable> deadLetters = this.getDeadLetters(); this.softly.assertThat(seq(deadLetters).toList()).isEmpty(); } - private Topology createTopology(final Properties properties) { - this.app = DeadLetterAnalyzerTopology.builder() - .inputPattern(Pattern.compile(".*-dead-letter-topic")) - .outputTopic("output") - .errorTopic("analyzer-stream-dead-letter-topic") - .examplesTopic("examples") - .statsTopic("stats") - .kafkaProperties(properties) - .build(); - final StreamsBuilder builder = new StreamsBuilder(); - this.app.buildTopology(builder); - return builder.build(); - } - private TestOutput getProcessedDeadLetters() { final Serde valueSerde = this.getLargeMessageSerde(); - return this.topology.streamOutput(this.app.getOutputTopic()) + return this.topology.streamOutput(TOPIC_CONFIG.getOutputTopic()) .withValueSerde(valueSerde); } private Serde getLargeMessageSerde() { final Serde valueSerde = new LargeMessageSerde<>(); - valueSerde.configure(new StreamsConfig(this.app.getKafkaProperties()).originals(), false); - return valueSerde; + final Preconfigured> preconfigured = Preconfigured.create(valueSerde); + return this.configureForValues(preconfigured); + } + + private T configureForKeys(final Preconfigured preconfigured) { + return this.getConfigurator().configureForKeys(preconfigured); + } + + private T configureForValues(final Preconfigured preconfigured) { + return this.getConfigurator().configureForValues(preconfigured); } private TestOutput getStatistics() { - return this.topology.streamOutput(this.app.getStatsTopic()) - .withValueSerde(this.app.getSpecificAvroSerde(false)); + return this.topology.streamOutput(DeadLetterAnalyzerTopology.getStatsTopic(TOPIC_CONFIG)) + .withValueSerde(this.configureForValues(DeadLetterAnalyzerTopology.getSpecificAvroSerde())); } private TestOutput getExamples() { final Serde valueSerde = this.getLargeMessageSerde(); - return this.topology.streamOutput(this.app.getExamplesTopic()) + return this.topology.streamOutput(DeadLetterAnalyzerTopology.getExamplesTopic(TOPIC_CONFIG)) .withValueSerde(valueSerde); } private Seq> getDeadLetters() { final Serde valueSerde = this.getLargeMessageSerde(); - final TestOutput output = this.topology.streamOutput(this.app.getErrorTopic()) + final TestOutput output = this.topology.streamOutput(TOPIC_CONFIG.getErrorTopic()) .withValueSerde(Serdes.ByteArray()); return seq(output) // Record has already been consumed by the analyzer and headers are modified. @@ -625,7 +634,7 @@ private TestInput getStreamsInput(final Serde keySerde private TestInput getInput(final Serde keySerde, final String topic) { return this.topology.input(topic) - .withValueSerde(this.app.getSpecificAvroSerde(false)) + .withValueSerde(this.configureForValues(DeadLetterAnalyzerTopology.getSpecificAvroSerde())) .withKeySerde(keySerde); }