Skip to content

Commit

Permalink
Upgrade to Kafka 3.8 (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 authored Jan 7, 2025
1 parent 5a9a531 commit fd9bc80
Show file tree
Hide file tree
Showing 55 changed files with 1,303 additions and 1,341 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
}

Expand All @@ -16,6 +16,7 @@ allprojects {
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
}
}

Expand Down
1 change: 0 additions & 1 deletion error-handling-avro/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ dependencies {
val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(testFixtures(project(":error-handling-core")))
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
val assertJVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -31,7 +31,7 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -42,10 +42,10 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -81,8 +81,8 @@ protected void buildTopology(final StreamsBuilder builder) {
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return kafkaProperties;
}
Expand All @@ -96,19 +96,20 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.add(1, "foo", 100)
.add(2, "bar", 200);

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
final List<ProducerRecord<Integer, String>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE)
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class))
final List<ProducerRecord<Integer, DeadLetter>> errors = this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class)
.toList();

this.softly.assertThat(errors)
.hasSize(2)
.allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp))
.allSatisfy(producerRecord -> this.softly.assertThat(producerRecord.timestamp())
.isGreaterThan(startTimestamp))
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
Expand All @@ -117,7 +118,8 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.hasValue(RuntimeException.class.getCanonicalName());
// We don't check the exact stack trace, but only that it consists of multiple lines
this.softly.assertThat(deadLetter.getCause().getStackTrace()).map(s -> Arrays.asList(s.split("\n")))
.get().asList().hasSizeGreaterThan(1);
.get().asInstanceOf(InstanceOfAssertFactories.LIST)
.hasSizeGreaterThan(1);
this.softly.assertThat(deadLetter.getTopic()).hasValue(INPUT_TOPIC);
this.softly.assertThat(deadLetter.getPartition()).hasValue(0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,7 +30,7 @@
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -41,10 +41,10 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -81,8 +81,8 @@ protected void buildTopology(final StreamsBuilder builder) {
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return kafkaProperties;
}
Expand All @@ -95,14 +95,14 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.add(1, "foo")
.add(2, "bar");

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
final List<ProducerRecord<Integer, String>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE)
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class))
final List<ProducerRecord<Integer, DeadLetter>> errors = this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class)
.toList();

this.softly.assertThat(errors)
Expand All @@ -115,7 +115,8 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.hasValue(RuntimeException.class.getCanonicalName());
// We don't check the exact stack trace, but only that it consists of multiple lines
this.softly.assertThat(deadLetter.getCause().getStackTrace()).map(s -> Arrays.asList(s.split("\n")))
.get().asList().hasSizeGreaterThan(1);
.get().asInstanceOf(InstanceOfAssertFactories.LIST)
.hasSizeGreaterThan(1);
this.softly.assertThat(deadLetter.getTopic()).hasValue(INPUT_TOPIC);
this.softly.assertThat(deadLetter.getPartition()).hasValue(0);
}
Expand Down
6 changes: 3 additions & 3 deletions error-handling-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ dependencies {
api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
val avroVersion: String by project
implementation(group = "org.apache.avro", name = "avro", version = avroVersion)
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.14.0")
implementation(group = "org.jooq", name = "jool", version = "0.9.15")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.17.0")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val mockitoVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -103,11 +103,11 @@ public void process(final FixedKeyRecord<K, ProcessingError<V>> inputRecord) {
.inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp()))
.build();

final FixedKeyRecord<K, T> record = inputRecord
final FixedKeyRecord<K, T> outputRecord = inputRecord
.withValue(this.deadLetterConverter.convert(deadLetterDescription))
.withTimestamp(this.context.currentSystemTimeMs());

this.context.forward(record);
this.context.forward(outputRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -28,7 +28,6 @@
* {@link Exception} thrown by error descriptors. The message contains information about input key and value.
*/
public class ProcessingException extends RuntimeException {
private static final long serialVersionUID = 6328000609687736610L;

ProcessingException(final Object value, final Throwable cause) {
super("Cannot process " + ErrorUtil.toString(value), cause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -42,7 +42,6 @@
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -100,16 +99,16 @@ void shouldForwardRecoverableException(final SoftAssertions softly) {
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCause(throwable);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.isEmpty();
Expand All @@ -136,9 +135,9 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) {
.withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.extracting(ProducerRecord::key)
Expand All @@ -148,14 +147,14 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) {
.containsExactlyInAnyOrder(2L, 1L, 18L);

final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> softly.assertThat(record.key()).isEqualTo(1))
.satisfies(producerRecord -> softly.assertThat(producerRecord.key()).isEqualTo(1))
.extracting(ProducerRecord::value)
.isInstanceOf(DeadLetterDescription.class)
.satisfies(deadLetter -> {
Expand All @@ -179,9 +178,9 @@ void shouldHandleNullInput(final SoftAssertions softly) {
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(null, null);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.extracting(ProducerRecord::key)
Expand All @@ -190,8 +189,8 @@ void shouldHandleNullInput(final SoftAssertions softly) {
.extracting(ProducerRecord::value)
.containsExactlyInAnyOrder(2L, 3L);
final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.isEmpty();
Expand All @@ -204,21 +203,21 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) {
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(null, null);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.isEmpty();
final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> softly.assertThat(record.key()).isNull())
.satisfies(producerRecord -> softly.assertThat(producerRecord.key()).isNull())
.extracting(ProducerRecord::value)
.isInstanceOf(DeadLetterDescription.class)
.satisfies(deadLetter -> {
Expand All @@ -241,20 +240,20 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) {
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(2, "bar");
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> softly.assertThat(record.key()).isNull())
.satisfies(producerRecord -> softly.assertThat(producerRecord.key()).isNull())
.extracting(ProducerRecord::value)
.satisfies(value -> softly.assertThat(value).isNull());
final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.isEmpty();
Expand Down
Loading

0 comments on commit fd9bc80

Please sign in to comment.