From 762c34e5bf469b09abbad89ba4f212448e5d8c08 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 21 Mar 2024 10:21:31 +0100 Subject: [PATCH] Forward all SerializationExceptions by default (#26) --- .github/workflows/build-and-publish.yaml | 3 +-- .github/workflows/release.yaml | 2 +- build.gradle.kts | 20 +++----------- .../java/com/bakdata/kafka/ErrorUtil.java | 14 ++-------- ...pturingFlatKeyValueMapperTopologyTest.java | 10 +++---- ...rCapturingFlatTransformerTopologyTest.java | 9 +++---- ...rCapturingFlatValueMapperTopologyTest.java | 10 +++---- ...ingFlatValueMapperWithKeyTopologyTest.java | 10 +++---- ...uringFlatValueTransformerTopologyTest.java | 9 +++---- ...atValueTransformerWithKeyTopologyTest.java | 9 +++---- ...orCapturingKeyValueMapperTopologyTest.java | 10 +++---- .../ErrorCapturingProcessorTopologyTest.java | 9 +++---- ...ErrorCapturingTransformerTopologyTest.java | 9 +++---- ...ErrorCapturingValueMapperTopologyTest.java | 10 +++---- ...pturingValueMapperWithKeyTopologyTest.java | 10 +++---- ...orCapturingValueProcessorTopologyTest.java | 9 +++---- ...CapturingValueTransformerTopologyTest.java | 9 +++---- ...ngValueTransformerWithKeyTopologyTest.java | 9 +++---- ...LoggingFlatKeyValueMapperTopologyTest.java | 12 ++++----- ...rorLoggingFlatTransformerTopologyTest.java | 11 ++++---- ...rorLoggingFlatValueMapperTopologyTest.java | 10 +++---- ...ingFlatValueMapperWithKeyTopologyTest.java | 10 +++---- ...ggingFlatValueTransformerTopologyTest.java | 9 +++---- ...atValueTransformerWithKeyTopologyTest.java | 9 +++---- ...rrorLoggingKeyValueMapperTopologyTest.java | 10 +++---- .../ErrorLoggingProcessorTopologyTest.java | 9 +++---- .../ErrorLoggingTransformerTopologyTest.java | 9 +++---- .../ErrorLoggingValueMapperTopologyTest.java | 10 +++---- ...LoggingValueMapperWithKeyTopologyTest.java | 10 +++---- ...rrorLoggingValueProcessorTopologyTest.java | 9 +++---- ...orLoggingValueTransformerTopologyTest.java | 9 +++---- ...ngValueTransformerWithKeyTopologyTest.java | 9 +++---- .../java/com/bakdata/kafka/ErrorUtilTest.java | 6 ++--- .../kafka/ErrorCaptureTopologyTest.java | 27 +++++++++---------- gradle.properties | 10 +++---- 35 files changed, 153 insertions(+), 197 deletions(-) diff --git a/.github/workflows/build-and-publish.yaml b/.github/workflows/build-and-publish.yaml index ec1d95d..8074368 100644 --- a/.github/workflows/build-and-publish.yaml +++ b/.github/workflows/build-and-publish.yaml @@ -8,7 +8,7 @@ on: jobs: build-and-publish: name: Java Gradle - uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.40.6 + uses: bakdata/ci-templates/.github/workflows/java-gradle-library.yaml@1.43.0 with: java-version: 17 secrets: @@ -19,5 +19,4 @@ jobs: signing-password: ${{ secrets.SONATYPE_SIGNING_PASSWORD }} ossrh-username: ${{ secrets.SONATYPE_OSSRH_USERNAME }} ossrh-password: ${{ secrets.SONATYPE_OSSRH_PASSWORD }} - github-username: ${{ secrets.GH_USERNAME }} github-token: ${{ secrets.GH_TOKEN }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 6a30c40..d124b9c 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -16,7 +16,7 @@ on: jobs: java-gradle-release: name: Java Gradle - uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.40.6 + uses: bakdata/ci-templates/.github/workflows/java-gradle-release.yaml@1.43.0 with: java-version: 17 release-type: "${{ inputs.release-type }}" diff --git a/build.gradle.kts b/build.gradle.kts index c2f1585..aba9cd1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,8 +1,7 @@ plugins { - id("net.researchgate.release") version "3.0.2" - id("com.bakdata.sonar") version "1.1.17" - id("com.bakdata.sonatype") version "1.1.14" - id("org.hildan.github.changelog") version "2.2.0" + id("com.bakdata.release") version "1.4.0" + id("com.bakdata.sonar") version "1.4.0" + id("com.bakdata.sonatype") version "1.4.0" id("io.freefair.lombok") version "8.4" } @@ -41,13 +40,6 @@ configure { } } -configure { - githubUser = "bakdata" - githubRepository = "kafka-error-handling" - futureVersionTag = findProperty("changelog.releaseVersion")?.toString() - sinceTag = findProperty("changelog.sinceTag")?.toString() -} - subprojects { apply(plugin = "java-library") apply(plugin = "java-test-fixtures") @@ -59,9 +51,3 @@ subprojects { } } } - -release { - git { - requireBranch.set("master") - } -} diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorUtil.java b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorUtil.java index 6a0432d..070743d 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorUtil.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorUtil.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.util.Objects; import lombok.experimental.UtilityClass; @@ -42,7 +41,6 @@ import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.common.errors.RecordTooLargeException; -import org.apache.kafka.common.errors.SerializationException; /** * This class provides utility methods for dealing with errors in Kafka streams, such as serializing values to string @@ -75,7 +73,6 @@ public static boolean isRecoverable(final Exception e) { *

Non-recoverable Kafka errors are: *

    *
  • {@link RecordTooLargeException} - *
  • {@link SerializationException} which is not caused by timeout *
* * @param e exception @@ -83,14 +80,7 @@ public static boolean isRecoverable(final Exception e) { */ public static boolean isRecoverableKafkaError(final Exception e) { if (ORG_APACHE_KAFKA_COMMON_ERRORS.equals(e.getClass().getPackageName())) { - if (e instanceof RecordTooLargeException) { - return false; - } - if (e instanceof SerializationException) { - // socket timeouts usually indicate that the schema registry is temporarily down - return e.getCause() instanceof SocketTimeoutException; - } - return true; + return !(e instanceof RecordTooLargeException); } return false; } diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java index 1121d37..21181be 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatKeyValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -93,13 +92,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java index c39f298..8370305 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -99,8 +98,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new Transformer<>() { private ProcessorContext context = null; @@ -126,7 +125,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperTopologyTest.java index eec5c22..084351a 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -94,13 +93,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply("foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply("foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperWithKeyTopologyTest.java index b8ca720..6aafc32 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueMapperWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -92,13 +91,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java index 262b9be..7ac9e7d 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -94,8 +93,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformer<>() { private ProcessorContext context = null; @@ -121,7 +120,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java index 0c4fb59..c80b2c7 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingFlatValueTransformerWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -97,8 +96,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformerWithKey<>() { private ProcessorContext context = null; @@ -124,7 +123,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java index 57a3910..5896a3f 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingKeyValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -93,13 +92,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java index 30b49d4..f00f154 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingProcessorTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -95,8 +94,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new Processor<>() { @Override @@ -120,7 +119,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java index 327b110..911d39f 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -97,8 +96,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new Transformer<>() { private ProcessorContext context = null; @@ -124,7 +123,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperTopologyTest.java index 8580c24..df9f354 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -91,13 +90,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply("foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply("foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperWithKeyTopologyTest.java index 1bc7426..156a7f4 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueMapperWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -91,13 +90,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java index f633388..53ba606 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueProcessorTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -95,8 +94,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new FixedKeyProcessor<>() { @Override @@ -120,7 +119,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java index 7342cb0..000de2f 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -91,8 +90,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformer<>() { private ProcessorContext context = null; @@ -118,7 +117,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java index 4344263..a135717 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorCapturingValueTransformerWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -94,8 +93,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformerWithKey<>() { private ProcessorContext context = null; @@ -121,7 +120,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java index 3fa38d8..2edc570 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatKeyValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -86,13 +85,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) @@ -151,7 +151,7 @@ void shouldHandleNullInput(final SoftAssertions softly) { .containsExactlyInAnyOrder(2.0, 3.0); softly.assertThat(records) .extracting(ProducerRecord::value) - .containsExactlyInAnyOrder(2L, 3l); + .containsExactlyInAnyOrder(2L, 3L); } @Test diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java index 4d5ed53..548bb04 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -90,8 +89,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new Transformer<>() { private ProcessorContext context = null; @@ -117,7 +116,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) @@ -234,7 +233,7 @@ public void close() { .containsExactlyInAnyOrder(2.0, 3.0); softly.assertThat(records) .extracting(ProducerRecord::value) - .containsExactlyInAnyOrder(2L, 3l); + .containsExactlyInAnyOrder(2L, 3L); } @Test diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java index c3f64be..187e889 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -85,13 +84,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply("foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply("foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperWithKeyTopologyTest.java index 590c016..684e9de 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueMapperWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -86,13 +85,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java index a367f75..24998c3 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -88,8 +87,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformer<>() { private ProcessorContext context = null; @@ -115,7 +114,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java index 2018d47..cf20c32 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingFlatValueTransformerWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -90,8 +89,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformerWithKey<>() { private ProcessorContext context = null; @@ -117,7 +116,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java index b780c29..eb7c21a 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingKeyValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -86,13 +85,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java index 371b3bb..a804b9c 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingProcessorTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -87,8 +86,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new Processor<>() { @Override @@ -112,7 +111,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java index 622457f..391c6a6 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -88,8 +87,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new Transformer<>() { private ProcessorContext context = null; @@ -115,7 +114,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withKeySerde(DOUBLE_SERDE) .withValueSerde(LONG_SERDE)) diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperTopologyTest.java index 9738e97..2429a85 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -84,13 +83,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply("foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply("foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperWithKeyTopologyTest.java index 3652e64..20f563e 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueMapperWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -85,13 +84,14 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - when(this.mapper.apply(1, "foo")).thenThrow(createSchemaRegistryTimeoutException()); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); + when(this.mapper.apply(1, "foo")).thenThrow(throwable); this.createTopology(); softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java index 3d3bbd6..285b045 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueProcessorTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -89,8 +88,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new FixedKeyProcessor<>() { @Override @@ -114,7 +113,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java index e752c81..96ab728 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -88,8 +87,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformer<>() { private ProcessorContext context = null; @@ -115,7 +114,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java index cbdd7b6..dbb6a60 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorLoggingValueTransformerWithKeyTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -29,7 +29,6 @@ import java.util.List; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -88,8 +87,8 @@ void shouldNotAllowNullFilter(final SoftAssertions softly) { } @Test - void shouldForwardSchemaRegistryTimeout(final SoftAssertions softly) { - final RuntimeException throwable = createSchemaRegistryTimeoutException(); + void shouldForwardRecoverableException(final SoftAssertions softly) { + final RuntimeException throwable = createRecoverableException(); this.mapper = new ValueTransformerWithKey<>() { private ProcessorContext context = null; @@ -115,7 +114,7 @@ public void close() { softly.assertThatThrownBy(() -> this.topology.input() .withValueSerde(STRING_SERDE) .add(1, "foo")) - .hasCauseInstanceOf(SerializationException.class); + .hasCause(throwable); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(LONG_SERDE)) .toList(); diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorUtilTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorUtilTest.java index b65646f..d7d7aec 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorUtilTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorUtilTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2020 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -27,7 +27,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; -import java.net.SocketTimeoutException; import java.util.stream.Stream; import org.apache.kafka.common.errors.SerializationException; import org.junit.jupiter.params.ParameterizedTest; @@ -48,8 +47,7 @@ static Stream generateConvertToStringParameters() { static Stream generateIsRecoverableExceptionParameters() { return Stream.of( Arguments.of(mock(Exception.class), false), - Arguments.of(new SerializationException(new SocketTimeoutException()), true), - Arguments.of(new SerializationException(mock(Exception.class)), false) + Arguments.of(new SerializationException(), true) ); } diff --git a/error-handling-core/src/testFixtures/java/com/bakdata/kafka/ErrorCaptureTopologyTest.java b/error-handling-core/src/testFixtures/java/com/bakdata/kafka/ErrorCaptureTopologyTest.java index 6226399..842dd43 100644 --- a/error-handling-core/src/testFixtures/java/com/bakdata/kafka/ErrorCaptureTopologyTest.java +++ b/error-handling-core/src/testFixtures/java/com/bakdata/kafka/ErrorCaptureTopologyTest.java @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022 bakdata + * Copyright (c) 2024 bakdata * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -25,7 +25,6 @@ package com.bakdata.kafka; import com.bakdata.fluent_kafka_streams_tests.TestTopology; -import java.net.SocketTimeoutException; import java.util.Properties; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.SerializationException; @@ -38,6 +37,17 @@ public abstract class ErrorCaptureTopologyTest { protected TestTopology topology = null; + protected static RuntimeException createRecoverableException() { + return new SerializationException(); + } + + @AfterEach + void tearDown() { + if (this.topology != null) { + this.topology.stop(); + } + } + protected Properties getKafkaProperties() { final Properties kafkaConfig = new Properties(); @@ -48,7 +58,7 @@ protected Properties getKafkaProperties() { kafkaConfig.setProperty(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all"); // topology - kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "fake"); + kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "fake"); kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, IntegerSerde.class); kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TestDeadLetterSerde.class); kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "fake"); @@ -56,17 +66,6 @@ protected Properties getKafkaProperties() { return kafkaConfig; } - @AfterEach - void tearDown() { - if (this.topology != null) { - this.topology.stop(); - } - } - - protected static RuntimeException createSchemaRegistryTimeoutException() { - return new SerializationException(new SocketTimeoutException()); - } - protected void createTopology() { final StreamsBuilder builder = new StreamsBuilder(); this.buildTopology(builder); diff --git a/gradle.properties b/gradle.properties index 97a1579..b07df47 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,9 +5,9 @@ org.gradle.jvmargs=-Xmx2048m kafkaVersion=3.5.2 avroVersion=1.11.3 confluentVersion=7.5.1 -jacksonVersion=2.16.1 -junitVersion=5.10.1 -mockitoVersion=5.8.0 -log4jVersion=2.22.1 +jacksonVersion=2.17.0 +junitVersion=5.10.2 +mockitoVersion=5.11.0 +log4jVersion=2.23.1 kafkaStreamsTestsVersion=2.11.1 -assertJVersion=3.25.1 +assertJVersion=3.25.3