Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Kafka 3.8 #29

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
plugins {
id("com.bakdata.release") version "1.4.0"
id("com.bakdata.sonar") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.0"
id("com.bakdata.sonatype") version "1.4.1"
id("io.freefair.lombok") version "8.4"
}

Expand All @@ -16,6 +16,7 @@ allprojects {
repositories {
mavenCentral()
maven(url = "https://packages.confluent.io/maven/")
maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots")
}
}

Expand Down
1 change: 0 additions & 1 deletion error-handling-avro/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ dependencies {
val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(testFixtures(project(":error-handling-core")))
testImplementation(group = "org.jooq", name = "jool", version = "0.9.14")
val mockitoVersion: String by project
testImplementation(group = "org.mockito", name = "mockito-junit-jupiter", version = mockitoVersion)
val assertJVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -31,7 +31,7 @@
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -42,10 +42,10 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -81,8 +81,8 @@ protected void buildTopology(final StreamsBuilder builder) {
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return kafkaProperties;
}
Expand All @@ -96,19 +96,20 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.add(1, "foo", 100)
.add(2, "bar", 200);

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
final List<ProducerRecord<Integer, String>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE)
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class))
final List<ProducerRecord<Integer, DeadLetter>> errors = this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class)
.toList();

this.softly.assertThat(errors)
.hasSize(2)
.allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp))
.allSatisfy(producerRecord -> this.softly.assertThat(producerRecord.timestamp())
.isGreaterThan(startTimestamp))
.extracting(ProducerRecord::value).allSatisfy(
deadLetter -> {
this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION);
Expand All @@ -117,7 +118,8 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.hasValue(RuntimeException.class.getCanonicalName());
// We don't check the exact stack trace, but only that it consists of multiple lines
this.softly.assertThat(deadLetter.getCause().getStackTrace()).map(s -> Arrays.asList(s.split("\n")))
.get().asList().hasSizeGreaterThan(1);
.get().asInstanceOf(InstanceOfAssertFactories.LIST)
.hasSizeGreaterThan(1);
this.softly.assertThat(deadLetter.getTopic()).hasValue(INPUT_TOPIC);
this.softly.assertThat(deadLetter.getPartition()).hasValue(0);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -30,7 +30,7 @@
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand All @@ -41,10 +41,10 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -81,8 +81,8 @@ protected void buildTopology(final StreamsBuilder builder) {
}

@Override
protected Properties getKafkaProperties() {
final Properties kafkaProperties = super.getKafkaProperties();
protected Map<String, Object> getKafkaProperties() {
final Map<String, Object> kafkaProperties = super.getKafkaProperties();
kafkaProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return kafkaProperties;
}
Expand All @@ -95,14 +95,14 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.add(1, "foo")
.add(2, "bar");

final List<ProducerRecord<Integer, String>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE))
final List<ProducerRecord<Integer, String>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withValueSerde(STRING_SERDE)
.toList();
this.softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetter>> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class))
final List<ProducerRecord<Integer, DeadLetter>> errors = this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetter.class)
.toList();

this.softly.assertThat(errors)
Expand All @@ -115,7 +115,8 @@ void shouldConvertAndSerializeAvroDeadLetter() {
.hasValue(RuntimeException.class.getCanonicalName());
// We don't check the exact stack trace, but only that it consists of multiple lines
this.softly.assertThat(deadLetter.getCause().getStackTrace()).map(s -> Arrays.asList(s.split("\n")))
.get().asList().hasSizeGreaterThan(1);
.get().asInstanceOf(InstanceOfAssertFactories.LIST)
.hasSizeGreaterThan(1);
this.softly.assertThat(deadLetter.getTopic()).hasValue(INPUT_TOPIC);
this.softly.assertThat(deadLetter.getPartition()).hasValue(0);
}
Expand Down
6 changes: 3 additions & 3 deletions error-handling-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ dependencies {
api(group = "org.apache.kafka", name = "kafka-streams", version = kafkaVersion)
val avroVersion: String by project
implementation(group = "org.apache.avro", name = "avro", version = avroVersion)
implementation(group = "org.jooq", name = "jool", version = "0.9.14")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.14.0")
implementation(group = "org.jooq", name = "jool", version = "0.9.15")
implementation(group = "org.apache.commons", name = "commons-lang3", version = "3.17.0")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val mockitoVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2022 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -103,11 +103,11 @@ public void process(final FixedKeyRecord<K, ProcessingError<V>> inputRecord) {
.inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp()))
.build();

final FixedKeyRecord<K, T> record = inputRecord
final FixedKeyRecord<K, T> outputRecord = inputRecord
.withValue(this.deadLetterConverter.convert(deadLetterDescription))
.withTimestamp(this.context.currentSystemTimeMs());

this.context.forward(record);
this.context.forward(outputRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2020 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -28,7 +28,6 @@
* {@link Exception} thrown by error descriptors. The message contains information about input key and value.
*/
public class ProcessingException extends RuntimeException {
private static final long serialVersionUID = 6328000609687736610L;

ProcessingException(final Object value, final Throwable cause) {
super("Cannot process " + ErrorUtil.toString(value), cause);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -42,7 +42,6 @@
import org.apache.kafka.streams.kstream.Produced;
import org.assertj.core.api.SoftAssertions;
import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
import org.jooq.lambda.Seq;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand Down Expand Up @@ -100,16 +99,16 @@ void shouldForwardRecoverableException(final SoftAssertions softly) {
.withValueSerde(STRING_SERDE)
.add(1, "foo"))
.hasCause(throwable);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.isEmpty();

final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.isEmpty();
Expand All @@ -136,9 +135,9 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) {
.withValueSerde(STRING_SERDE)
.add(1, "foo")
.add(2, "bar");
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.extracting(ProducerRecord::key)
Expand All @@ -148,14 +147,14 @@ void shouldCaptureKeyValueMapperError(final SoftAssertions softly) {
.containsExactlyInAnyOrder(2L, 1L, 18L);

final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> softly.assertThat(record.key()).isEqualTo(1))
.satisfies(producerRecord -> softly.assertThat(producerRecord.key()).isEqualTo(1))
.extracting(ProducerRecord::value)
.isInstanceOf(DeadLetterDescription.class)
.satisfies(deadLetter -> {
Expand All @@ -179,9 +178,9 @@ void shouldHandleNullInput(final SoftAssertions softly) {
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(null, null);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.extracting(ProducerRecord::key)
Expand All @@ -190,8 +189,8 @@ void shouldHandleNullInput(final SoftAssertions softly) {
.extracting(ProducerRecord::value)
.containsExactlyInAnyOrder(2L, 3L);
final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.isEmpty();
Expand All @@ -204,21 +203,21 @@ void shouldHandleErrorOnNullInput(final SoftAssertions softly) {
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(null, null);
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.isEmpty();
final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> softly.assertThat(record.key()).isNull())
.satisfies(producerRecord -> softly.assertThat(producerRecord.key()).isNull())
.extracting(ProducerRecord::value)
.isInstanceOf(DeadLetterDescription.class)
.satisfies(deadLetter -> {
Expand All @@ -241,20 +240,20 @@ void shouldHandleNullKeyValue(final SoftAssertions softly) {
this.topology.input()
.withValueSerde(STRING_SERDE)
.add(2, "bar");
final List<ProducerRecord<Double, Long>> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE))
final List<ProducerRecord<Double, Long>> records = this.topology.streamOutput(OUTPUT_TOPIC)
.withKeySerde(DOUBLE_SERDE)
.withValueSerde(LONG_SERDE)
.toList();
softly.assertThat(records)
.hasSize(1)
.first()
.isNotNull()
.satisfies(record -> softly.assertThat(record.key()).isNull())
.satisfies(producerRecord -> softly.assertThat(producerRecord.key()).isNull())
.extracting(ProducerRecord::value)
.satisfies(value -> softly.assertThat(value).isNull());
final List<ProducerRecord<Integer, DeadLetterDescription>> errors =
Seq.seq(this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class))
this.topology.streamOutput(ERROR_TOPIC)
.withValueType(DeadLetterDescription.class)
.toList();
softly.assertThat(errors)
.isEmpty();
Expand Down
Loading