Skip to content

Commit

Permalink
Upgrade to streams-bootstrap v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Aug 20, 2024
1 parent 4fb4302 commit 7fa91fd
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 62 deletions.
40 changes: 16 additions & 24 deletions src/main/java/com/bakdata/kafka/DeadLetterAnalyzerTopology.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ static <T extends SpecificRecord> Preconfigured<Serde<T>> getSpecificAvroSerde()
return Preconfigured.create(serde);
}

static String getExamplesTopic(final StreamsTopicConfig topics) {
return topics.getOutputTopic(EXAMPLES_TOPIC_LABEL);
}

static String getStatsTopic(final StreamsTopicConfig topics) {
return topics.getOutputTopic(STATS_TOPIC_LABEL);
}

private static String toElasticKey(final ErrorKey key) {
return String.format("%s:%s", key.getTopic(), key.getType());
}
Expand Down Expand Up @@ -98,46 +106,30 @@ void buildTopology() {
final KStream<Object, DeadLetter> allDeadLetters = this.streamDeadLetters();
final KStream<Object, KeyedDeadLetterWithContext> deadLettersWithContext =
this.enrichWithContext(allDeadLetters);
final StreamsTopicConfig topics = this.builder.getTopics();
deadLettersWithContext
.selectKey((k, v) -> v.extractElasticKey())
.mapValues(KeyedDeadLetterWithContext::format)
.to(this.getOutputTopic());
.to(topics.getOutputTopic());

final KStream<ErrorKey, Result> aggregated = this.aggregate(deadLettersWithContext);
aggregated
.mapValues((errorKey, result) -> result.toFullErrorStatistics(errorKey))
.selectKey((k, v) -> toElasticKey(k))
.to(this.getStatsTopic(),
Produced.valueSerde(
this.configureForValues(getSpecificAvroSerde())));
.to(getStatsTopic(topics),
Produced.valueSerde(this.configureForValues(getSpecificAvroSerde())));
aggregated
.flatMapValues(Result::getExamples)
.mapValues(DeadLetterAnalyzerTopology::toErrorExample)
.selectKey((k, v) -> toElasticKey(k))
.to(this.getExamplesTopic());
}

String getExamplesTopic() {
return this.builder.getTopics().getOutputTopic(EXAMPLES_TOPIC_LABEL);
}

String getStatsTopic() {
return this.builder.getTopics().getOutputTopic(STATS_TOPIC_LABEL);
}

String getOutputTopic() {
return this.builder.getTopics().getOutputTopic();
}

String getErrorTopic() {
return this.builder.getTopics().getErrorTopic();
.to(getExamplesTopic(topics));
}

<T> T configureForKeys(final Preconfigured<T> preconfigured) {
private <T> T configureForKeys(final Preconfigured<T> preconfigured) {
return this.builder.createConfigurator().configureForKeys(preconfigured);
}

<T> T configureForValues(final Preconfigured<T> preconfigured) {
private <T> T configureForValues(final Preconfigured<T> preconfigured) {
return this.builder.createConfigurator().configureForValues(preconfigured);
}

Expand Down Expand Up @@ -166,7 +158,7 @@ private KStream<Object, DeadLetter> streamDeadLetters() {
private <K> void toDeadLetterTopic(final KStream<K, DeadLetter> connectDeadLetters) {
connectDeadLetters
.selectKey((k, v) -> ErrorUtil.toString(k))
.to(this.getErrorTopic());
.to(this.builder.getTopics().getErrorTopic());
}

private KStream<ErrorKey, Result> aggregate(final KStream<?, KeyedDeadLetterWithContext> withContext) {
Expand Down
70 changes: 32 additions & 38 deletions src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC;
import static com.bakdata.kafka.Formatter.DATE_TIME_FORMATTER;
import static com.bakdata.kafka.HeaderLargeMessagePayloadProtocol.getHeaderName;
import static com.bakdata.kafka.TestTopologyFactory.createTopologyExtensionWithSchemaRegistry;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_CONNECTOR_NAME;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
Expand All @@ -62,7 +63,6 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Topology;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
Expand All @@ -73,12 +73,26 @@

@ExtendWith(SoftAssertionsExtension.class)
class DeadLetterAnalyzerTopologyTest {
private DeadLetterAnalyzerTopology app;
private static final StreamsTopicConfig TOPIC_CONFIG = StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*-dead-letter-topic"))
.outputTopic("output")
.errorTopic("analyzer-stream-dead-letter-topic")
.labeledOutputTopics(
Map.of(
EXAMPLES_TOPIC_LABEL, "examples",
STATS_TOPIC_LABEL, "stats"
)
)
.build();
@RegisterExtension
TestTopologyExtension<String, DeadLetter> topology = this.createTestTopology();
TestTopologyExtension<String, DeadLetter> topology = createTopologyExtensionWithSchemaRegistry(createApp());
@InjectSoftAssertions
private SoftAssertions softly;

private static ConfiguredStreamsApp<StreamsApp> createApp() {
return new ConfiguredStreamsApp<>(new DeadLetterAnalyzerApplication(), new AppConfiguration<>(TOPIC_CONFIG));
}

private static LocalDateTime parseDateTime(final String dateTime) {
return LocalDateTime.parse(dateTime, DATE_TIME_FORMATTER);
}
Expand Down Expand Up @@ -524,7 +538,8 @@ void shouldProcessStreamsHeaderErrors() {
@Test
void shouldReadAvroKey() {
final TestInput<SpecificRecord, SpecificRecord> input =
this.getStreamsInput(this.app.configureForKeys(DeadLetterAnalyzerTopology.getSpecificAvroSerde()));
this.getStreamsInput(
this.getConfigurator().configureForKeys(DeadLetterAnalyzerTopology.getSpecificAvroSerde()));
final TestOutput<String, FullDeadLetterWithContext> processedDeadLetters =
this.getProcessedDeadLetters();
final TestOutput<String, FullErrorStatistics> statistics = this.getStatistics();
Expand Down Expand Up @@ -563,63 +578,41 @@ void shouldReadAvroKey() {
this.assertNoDeadLetters();
}

private Configurator getConfigurator() {
return new Configurator(this.topology.getProperties());
}

private void assertNoDeadLetters() {
final Iterable<ProducerRecord<String, DeadLetter>> deadLetters = this.getDeadLetters();
this.softly.assertThat(seq(deadLetters).toList()).isEmpty();
}

private TestTopologyExtension<String, DeadLetter> createTestTopology() {
final StreamsApp app = new DeadLetterAnalyzerApplication();
final StreamsTopicConfig topicConfig = StreamsTopicConfig.builder()
.inputPattern(Pattern.compile(".*-dead-letter-topic"))
.outputTopic("output")
.errorTopic("analyzer-stream-dead-letter-topic")
.labeledOutputTopics(
Map.of(
EXAMPLES_TOPIC_LABEL, "examples",
STATS_TOPIC_LABEL, "stats"
)
)
.build();
final ConfiguredStreamsApp<StreamsApp> configuredApp =
new ConfiguredStreamsApp<>(app, new AppConfiguration<>(topicConfig));
return new TestTopologyExtension<>(properties -> this.createTopology(properties, configuredApp),
TestTopologyFactory.getKafkaPropertiesWithSchemaRegistryUrl(configuredApp));
}

private Topology createTopology(final Map<String, Object> properties, final ConfiguredStreamsApp<?> configuredApp) {
final TopologyBuilder builder = new TopologyBuilder(configuredApp.getTopics(), properties);
this.app = new DeadLetterAnalyzerTopology(builder);
this.app.buildTopology();
return builder.build();
}

private TestOutput<String, FullDeadLetterWithContext> getProcessedDeadLetters() {
final Serde<FullDeadLetterWithContext> valueSerde = this.getLargeMessageSerde();
return this.topology.streamOutput(this.app.getOutputTopic())
return this.topology.streamOutput(TOPIC_CONFIG.getOutputTopic())
.withValueSerde(valueSerde);
}

private <T> Serde<T> getLargeMessageSerde() {
final Serde<T> valueSerde = new LargeMessageSerde<>();
return this.app.configureForValues(Preconfigured.create(valueSerde));
return this.getConfigurator().configureForValues(valueSerde);
}

private TestOutput<String, FullErrorStatistics> getStatistics() {
return this.topology.streamOutput(this.app.getStatsTopic())
.withValueSerde(new Configurator(this.topology.getProperties()).configureForValues(
DeadLetterAnalyzerTopology.getSpecificAvroSerde()));
return this.topology.streamOutput(DeadLetterAnalyzerTopology.getStatsTopic(TOPIC_CONFIG))
.withValueSerde(
this.getConfigurator().configureForValues(DeadLetterAnalyzerTopology.getSpecificAvroSerde()));
}

private TestOutput<String, ErrorExample> getExamples() {
final Serde<ErrorExample> valueSerde = this.getLargeMessageSerde();
return this.topology.streamOutput(this.app.getExamplesTopic())
return this.topology.streamOutput(DeadLetterAnalyzerTopology.getExamplesTopic(TOPIC_CONFIG))
.withValueSerde(valueSerde);
}

private Seq<ProducerRecord<String, DeadLetter>> getDeadLetters() {
final Serde<DeadLetter> valueSerde = this.getLargeMessageSerde();
final TestOutput<String, byte[]> output = this.topology.streamOutput(this.app.getErrorTopic())
final TestOutput<String, byte[]> output = this.topology.streamOutput(TOPIC_CONFIG.getErrorTopic())
.withValueSerde(Serdes.ByteArray());
return seq(output)
// Record has already been consumed by the analyzer and headers are modified.
Expand All @@ -634,7 +627,8 @@ private <K> TestInput<K, SpecificRecord> getStreamsInput(final Serde<K> keySerde

private <K> TestInput<K, SpecificRecord> getInput(final Serde<K> keySerde, final String topic) {
return this.topology.input(topic)
.withValueSerde(this.app.configureForValues(DeadLetterAnalyzerTopology.getSpecificAvroSerde()))
.withValueSerde(
this.getConfigurator().configureForValues(DeadLetterAnalyzerTopology.getSpecificAvroSerde()))
.withKeySerde(keySerde);
}

Expand Down

0 comments on commit 7fa91fd

Please sign in to comment.