From 2cce27f61fbc54433b034943a865f758e6bd0620 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 07:40:09 +0200 Subject: [PATCH 1/4] Support dead letters with `input_timestamp` field --- build.gradle.kts | 3 +- .../kafka/ConnectDeadLetterParser.java | 5 ++- .../com/bakdata/kafka/DeadLetterParser.java | 4 +- .../kafka/DeadLetterParserTransformer.java | 3 +- .../kafka/StreamsDeadLetterParser.java | 9 ++++- .../kafka/ConnectDeadLetterParserTest.java | 13 ++++--- .../kafka/DeadLetterAnalyzerTopologyTest.java | 14 ++++++- .../kafka/StreamsDeadLetterParserTest.java | 37 +++++++++++-------- 8 files changed, 62 insertions(+), 26 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 5c35c69..c32f796 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -20,6 +20,7 @@ allprojects { repositories { mavenCentral() + maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots/") maven(url = "https://packages.confluent.io/maven/") } } @@ -52,7 +53,7 @@ dependencies { implementation(group = "com.bakdata.kafka", name = "brute-force-serde", version = "1.2.1") implementation(group = "com.bakdata.kafka", name = "large-message-serde", version = "2.7.0") implementation(group = "org.jooq", name = "jool", version = "0.9.14") - avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.4.2") + avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.4.5-SNAPSHOT") val junitVersion = "5.10.1" testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion) diff --git a/src/main/java/com/bakdata/kafka/ConnectDeadLetterParser.java b/src/main/java/com/bakdata/kafka/ConnectDeadLetterParser.java index 0d9098a..d801af8 100644 --- a/src/main/java/com/bakdata/kafka/ConnectDeadLetterParser.java +++ b/src/main/java/com/bakdata/kafka/ConnectDeadLetterParser.java @@ -37,6 +37,7 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE; import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID; +import java.time.Instant; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.header.Headers; @@ -45,7 +46,7 @@ class ConnectDeadLetterParser implements DeadLetterParser { @Override - public DeadLetter convert(final Object value, final Headers headers) { + public DeadLetter convert(final Object value, final Headers headers, final long recordTimestamp) { final Optional partition = getHeader(headers, ERROR_HEADER_ORIG_PARTITION) .map(HeaderHelper::intValue); final Optional topic = getHeader(headers, ERROR_HEADER_ORIG_TOPIC) @@ -83,6 +84,8 @@ public DeadLetter convert(final Object value, final Headers headers) { .setDescription( String.format("Error in stage %s (%s) in %s[%d]", stage, clazz, connectorName, taskId)) .setCause(description) + // Kafka Connect propagates the timestamp of the original message + .setInputTimestamp(Instant.ofEpochMilli(recordTimestamp)) .build(); } diff --git a/src/main/java/com/bakdata/kafka/DeadLetterParser.java b/src/main/java/com/bakdata/kafka/DeadLetterParser.java index 3e78e1f..d91d3a7 100644 --- a/src/main/java/com/bakdata/kafka/DeadLetterParser.java +++ b/src/main/java/com/bakdata/kafka/DeadLetterParser.java @@ -25,6 +25,7 @@ package com.bakdata.kafka; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; @FunctionalInterface interface DeadLetterParser { @@ -36,5 +37,6 @@ interface DeadLetterParser { * @param headers headers to retrieve meta information such as topic, partition, and offset from. * @return {@link DeadLetter} object representing error */ - DeadLetter convert(Object value, Headers headers); + DeadLetter convert(Object value, Headers headers, long recordTimestamp); + } diff --git a/src/main/java/com/bakdata/kafka/DeadLetterParserTransformer.java b/src/main/java/com/bakdata/kafka/DeadLetterParserTransformer.java index 11fc021..93b5b72 100644 --- a/src/main/java/com/bakdata/kafka/DeadLetterParserTransformer.java +++ b/src/main/java/com/bakdata/kafka/DeadLetterParserTransformer.java @@ -42,7 +42,8 @@ public void init(final FixedKeyProcessorContext context) { @Override public void process(final FixedKeyRecord inputRecord) { - final DeadLetter deadLetter = this.converter.convert(inputRecord.value(), inputRecord.headers()); + final DeadLetter deadLetter = + this.converter.convert(inputRecord.value(), inputRecord.headers(), inputRecord.timestamp()); this.context.forward(inputRecord.withValue(deadLetter)); } diff --git a/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java b/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java index 1269ca1..6bab1ec 100644 --- a/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java +++ b/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java @@ -28,6 +28,7 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_STACK_TRACE; +import static com.bakdata.kafka.ErrorHeaderProcessor.INPUT_TIMESTAMP; import static com.bakdata.kafka.ErrorHeaderProcessor.OFFSET; import static com.bakdata.kafka.ErrorHeaderProcessor.PARTITION; import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; @@ -35,6 +36,7 @@ import static com.bakdata.kafka.HeaderHelper.missingRequiredHeader; import static com.bakdata.kafka.HeaderHelper.stringValue; +import java.time.Instant; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.header.Headers; @@ -45,7 +47,7 @@ class StreamsDeadLetterParser implements DeadLetterParser { static final String FAULTY_OFFSET_HEADER = "HEADER_PREFIX + offset"; @Override - public DeadLetter convert(final Object value, final Headers headers) { + public DeadLetter convert(final Object value, final Headers headers, final long recordTimestamp) { final int partition = getHeader(headers, PARTITION) .map(HeaderHelper::intValue) .orElseThrow(missingRequiredHeader(PARTITION)); @@ -70,6 +72,10 @@ public DeadLetter convert(final Object value, final Headers headers) { final String stackTrace = getHeader(headers, EXCEPTION_STACK_TRACE) .flatMap(HeaderHelper::stringValue) .orElseThrow(missingRequiredHeader(EXCEPTION_STACK_TRACE)); + final Instant inputTimestamp = getHeader(headers, INPUT_TIMESTAMP) + .map(HeaderHelper::longValue) + .map(Instant::ofEpochMilli) + .orElse(Instant.ofEpochMilli(recordTimestamp)); final ErrorDescription errorDescription = ErrorDescription.newBuilder() .setErrorClass(errorClass) .setMessage(message) @@ -82,6 +88,7 @@ public DeadLetter convert(final Object value, final Headers headers) { .setInputValue(Optional.ofNullable(value).map(ErrorUtil::toString).orElse(null)) .setDescription(description) .setCause(errorDescription) + .setInputTimestamp(inputTimestamp) .build(); } } diff --git a/src/test/java/com/bakdata/kafka/ConnectDeadLetterParserTest.java b/src/test/java/com/bakdata/kafka/ConnectDeadLetterParserTest.java index 927f5cf..727d748 100644 --- a/src/test/java/com/bakdata/kafka/ConnectDeadLetterParserTest.java +++ b/src/test/java/com/bakdata/kafka/ConnectDeadLetterParserTest.java @@ -36,6 +36,7 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.stream.Stream; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -140,7 +141,8 @@ void shouldConvert() { .add(ERROR_HEADER_CONNECTOR_NAME, toBytes("my-connector")) .add(ERROR_HEADER_EXCEPTION_MESSAGE, toBytes("my message")) .add(ERROR_HEADER_EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); - this.softly.assertThat(new ConnectDeadLetterParser().convert("foo", headers)) + + this.softly.assertThat(new ConnectDeadLetterParser().convert("foo", headers, 200)) .satisfies(deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getPartition()).hasValue(1); @@ -154,13 +156,14 @@ void shouldConvert() { this.softly.assertThat(deadLetter.getDescription()) .isEqualTo("Error in stage VALUE_CONVERTER (org.apache.kafka.connect.json.JsonConverter) " + "in my-connector[2]"); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200)); }); } @Test void shouldConvertWithMissingHeaders() { final Headers headers = generateDefaultHeaders(); - this.softly.assertThat(new ConnectDeadLetterParser().convert("foo", headers)) + this.softly.assertThat(new ConnectDeadLetterParser().convert("foo", headers, 0)) .satisfies(deadLetter -> { this.softly.assertThat(deadLetter.getPartition()).isNotPresent(); this.softly.assertThat(deadLetter.getTopic()).isNotPresent(); @@ -176,14 +179,14 @@ void shouldConvertWithMissingHeaders() { void shouldConvertWithNullHeaders() { final Headers headers = generateDefaultHeaders() .add(ERROR_HEADER_EXCEPTION_MESSAGE, null); - this.softly.assertThat(new ConnectDeadLetterParser().convert("foo", headers)) + this.softly.assertThat(new ConnectDeadLetterParser().convert("foo", headers, 0)) .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getCause().getMessage()).isNotPresent()); } @ParameterizedTest @MethodSource("generateMissingRequiredHeaders") void shouldThrowWithMissingRequiredHeaders(final Headers headers, final String message) { - this.softly.assertThatThrownBy(() -> new ConnectDeadLetterParser().convert("foo", headers)) + this.softly.assertThatThrownBy(() -> new ConnectDeadLetterParser().convert("foo", headers, 0)) .isInstanceOf(IllegalArgumentException.class) .hasMessage(message); } @@ -191,7 +194,7 @@ void shouldThrowWithMissingRequiredHeaders(final Headers headers, final String m @ParameterizedTest @MethodSource("generateNonNullableHeaders") void shouldThrowWithNonNullableHeaders(final Headers headers, final String message) { - this.softly.assertThatThrownBy(() -> new ConnectDeadLetterParser().convert("foo", headers)) + this.softly.assertThatThrownBy(() -> new ConnectDeadLetterParser().convert("foo", headers, 0)) .isInstanceOf(IllegalArgumentException.class) .hasMessage(message); } diff --git a/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java b/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java index 9c3c0b4..87b3409 100644 --- a/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java +++ b/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java @@ -29,6 +29,7 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_STACK_TRACE; +import static com.bakdata.kafka.ErrorHeaderProcessor.INPUT_TIMESTAMP; import static com.bakdata.kafka.ErrorHeaderProcessor.OFFSET; import static com.bakdata.kafka.ErrorHeaderProcessor.PARTITION; import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; @@ -118,6 +119,7 @@ void shouldProcessDeadLetter() { .setStackTrace(StackTraceClassifierTest.STACK_TRACE) .build()) .setDescription("description") + .setInputTimestamp(Instant.ofEpochMilli(200L)) .build(); final long timestamp = 0L; input.add("key", deadLetter, timestamp); @@ -180,6 +182,7 @@ void shouldAggregateStatistics() { .setStackTrace(StackTraceClassifierTest.STACK_TRACE) .build()) .setDescription("description") + .setInputTimestamp(Instant.ofEpochMilli(200L)) .build(); final long firstTimestamp = 0L; input.add("key", firstDeadLetter, firstTimestamp); @@ -239,6 +242,7 @@ void shouldOnlyForwardFirstExample() { .setStackTrace(StackTraceClassifierTest.STACK_TRACE) .build()) .setDescription("description") + .setInputTimestamp(Instant.ofEpochMilli(200L)) .build(); final long firstTimestamp = 0L; input.add("key", firstDeadLetter, firstTimestamp); @@ -300,6 +304,7 @@ void shouldProduceDeadLetterAndAnalyze() { .setStackTrace(null) .build()) .setDescription("description") + .setInputTimestamp(Instant.ofEpochMilli(200L)) .build(); final TestOutput processedDeadLetters = @@ -388,7 +393,9 @@ void shouldProcessConnectErrors() { .setPartition(1) .setTopic("my-topic") .setOffset(10L) + .setInputTimestamp(Instant.ofEpochMilli(firstTimestamp)) .build(); + this.softly.assertThat(seq(processedDeadLetters).toList()) .hasSize(1) .anySatisfy(record -> { @@ -453,7 +460,9 @@ void shouldProcessStreamsHeaderErrors() { .add(DESCRIPTION, toBytes("description")) .add(EXCEPTION_CLASS_NAME, toBytes("org.apache.kafka.connect.errors.DataException")) .add(EXCEPTION_MESSAGE, toBytes("my message")) - .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); + .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)) + .add(INPUT_TIMESTAMP, toBytes(200L)); + input.add("key", "value", 0L, headers); final DeadLetter expectedDeadLetter = DeadLetter.newBuilder() @@ -467,7 +476,9 @@ void shouldProcessStreamsHeaderErrors() { .setPartition(1) .setTopic("my-topic") .setOffset(10L) + .setInputTimestamp(Instant.ofEpochMilli(200L)) .build(); + this.softly.assertThat(seq(processedDeadLetters).toList()) .hasSize(1) .anySatisfy(record -> { @@ -531,6 +542,7 @@ void shouldReadAvroKey() { .setStackTrace(StackTraceClassifierTest.STACK_TRACE) .build()) .setDescription("description") + .setInputTimestamp(Instant.ofEpochMilli(200L)) .build(); input.add(TestRecord.newBuilder().setId(1).build(), firstDeadLetter); this.softly.assertThat(seq(processedDeadLetters).toList()) diff --git a/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java b/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java index c9d8bfa..ea2b925 100644 --- a/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java +++ b/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java @@ -29,11 +29,13 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_STACK_TRACE; +import static com.bakdata.kafka.ErrorHeaderProcessor.INPUT_TIMESTAMP; import static com.bakdata.kafka.ErrorHeaderProcessor.OFFSET; import static com.bakdata.kafka.ErrorHeaderProcessor.PARTITION; import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; import static com.bakdata.kafka.StreamsDeadLetterParser.FAULTY_OFFSET_HEADER; +import java.time.Instant; import java.util.stream.Stream; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -139,20 +141,15 @@ private static Headers generateDefaultHeaders() { .add(DESCRIPTION, toBytes("description")) .add(EXCEPTION_CLASS_NAME, toBytes("org.apache.kafka.connect.errors.DataException")) .add(EXCEPTION_MESSAGE, toBytes("my message")) - .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); + .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)) + .add(INPUT_TIMESTAMP, toBytes(200L)); } @Test void shouldConvert() { - final Headers headers = new RecordHeaders() - .add(PARTITION, toBytes(1)) - .add(TOPIC, toBytes("my-topic")) - .add(OFFSET, toBytes(10L)) - .add(DESCRIPTION, toBytes("description")) - .add(EXCEPTION_CLASS_NAME, toBytes("org.apache.kafka.connect.errors.DataException")) - .add(EXCEPTION_MESSAGE, toBytes("my message")) - .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); - this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers)) + final Headers headers = generateDefaultHeaders(); + + this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 0)) .satisfies(deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getPartition()).hasValue(1); @@ -164,6 +161,7 @@ void shouldConvert() { this.softly.assertThat(cause.getMessage()).hasValue("my message"); this.softly.assertThat(cause.getStackTrace()).hasValue(StackTraceClassifierTest.STACK_TRACE); this.softly.assertThat(deadLetter.getDescription()).isEqualTo("description"); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200L)); }); } @@ -172,7 +170,7 @@ void shouldConvertFaultyOffsetHeader() { final Headers headers = generateDefaultHeaders() .remove(OFFSET) .add(FAULTY_OFFSET_HEADER, toBytes(100L)); - this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers)) + this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 0)) .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getOffset()).hasValue(100L)); } @@ -181,7 +179,7 @@ void shouldIgnoreFaultyOffsetHeader() { final Headers headers = generateDefaultHeaders() .add(OFFSET, toBytes(10L)) .add(FAULTY_OFFSET_HEADER, toBytes(100L)); - this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers)) + this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 0)) .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getOffset()).hasValue(10L)); } @@ -189,14 +187,23 @@ void shouldIgnoreFaultyOffsetHeader() { void shouldConvertNullMessageHeader() { final Headers headers = generateDefaultHeaders() .add(EXCEPTION_MESSAGE, null); - this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers)) + this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 0)) .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getCause().getMessage()).isNotPresent()); } + @Test + void shouldFallbackToRecordTimestamp() { + Headers headers = generateDefaultHeaders().remove(INPUT_TIMESTAMP); + this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 500)) + .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getInputTimestamp()) + .hasValue(Instant.ofEpochMilli(500L))); + } + + @ParameterizedTest @MethodSource("generateMissingRequiredHeaders") void shouldThrowWithMissingRequiredHeaders(final Headers headers, final String message) { - this.softly.assertThatThrownBy(() -> new StreamsDeadLetterParser().convert("foo", headers)) + this.softly.assertThatThrownBy(() -> new StreamsDeadLetterParser().convert("foo", headers, 0)) .isInstanceOf(IllegalArgumentException.class) .hasMessage(message); } @@ -204,7 +211,7 @@ void shouldThrowWithMissingRequiredHeaders(final Headers headers, final String m @ParameterizedTest @MethodSource("generateNonNullableHeaders") void shouldThrowWithNonNullableHeaders(final Headers headers, final String message) { - this.softly.assertThatThrownBy(() -> new StreamsDeadLetterParser().convert("foo", headers)) + this.softly.assertThatThrownBy(() -> new StreamsDeadLetterParser().convert("foo", headers, 0)) .isInstanceOf(IllegalArgumentException.class) .hasMessage(message); } From a1ea434911404d6788783aa57fb15e3e2eeeeba6 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 11:30:35 +0200 Subject: [PATCH 2/4] Don't use input_timestamp for headers --- .../com/bakdata/kafka/StreamsDeadLetterParser.java | 8 ++------ .../kafka/DeadLetterAnalyzerTopologyTest.java | 8 +++----- .../bakdata/kafka/StreamsDeadLetterParserTest.java | 14 ++------------ 3 files changed, 7 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java b/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java index 6bab1ec..684e21a 100644 --- a/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java +++ b/src/main/java/com/bakdata/kafka/StreamsDeadLetterParser.java @@ -28,7 +28,6 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_STACK_TRACE; -import static com.bakdata.kafka.ErrorHeaderProcessor.INPUT_TIMESTAMP; import static com.bakdata.kafka.ErrorHeaderProcessor.OFFSET; import static com.bakdata.kafka.ErrorHeaderProcessor.PARTITION; import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; @@ -72,10 +71,6 @@ public DeadLetter convert(final Object value, final Headers headers, final long final String stackTrace = getHeader(headers, EXCEPTION_STACK_TRACE) .flatMap(HeaderHelper::stringValue) .orElseThrow(missingRequiredHeader(EXCEPTION_STACK_TRACE)); - final Instant inputTimestamp = getHeader(headers, INPUT_TIMESTAMP) - .map(HeaderHelper::longValue) - .map(Instant::ofEpochMilli) - .orElse(Instant.ofEpochMilli(recordTimestamp)); final ErrorDescription errorDescription = ErrorDescription.newBuilder() .setErrorClass(errorClass) .setMessage(message) @@ -88,7 +83,8 @@ public DeadLetter convert(final Object value, final Headers headers, final long .setInputValue(Optional.ofNullable(value).map(ErrorUtil::toString).orElse(null)) .setDescription(description) .setCause(errorDescription) - .setInputTimestamp(inputTimestamp) + // The Header processor propagates the timestamp of the original message + .setInputTimestamp(Instant.ofEpochMilli(recordTimestamp)) .build(); } } diff --git a/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java b/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java index 87b3409..7020072 100644 --- a/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java +++ b/src/test/java/com/bakdata/kafka/DeadLetterAnalyzerTopologyTest.java @@ -29,7 +29,6 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_STACK_TRACE; -import static com.bakdata.kafka.ErrorHeaderProcessor.INPUT_TIMESTAMP; import static com.bakdata.kafka.ErrorHeaderProcessor.OFFSET; import static com.bakdata.kafka.ErrorHeaderProcessor.PARTITION; import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; @@ -460,10 +459,9 @@ void shouldProcessStreamsHeaderErrors() { .add(DESCRIPTION, toBytes("description")) .add(EXCEPTION_CLASS_NAME, toBytes("org.apache.kafka.connect.errors.DataException")) .add(EXCEPTION_MESSAGE, toBytes("my message")) - .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)) - .add(INPUT_TIMESTAMP, toBytes(200L)); + .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); - input.add("key", "value", 0L, headers); + input.add("key", "value", firstTimestamp, headers); final DeadLetter expectedDeadLetter = DeadLetter.newBuilder() .setInputValue("value") @@ -476,7 +474,7 @@ void shouldProcessStreamsHeaderErrors() { .setPartition(1) .setTopic("my-topic") .setOffset(10L) - .setInputTimestamp(Instant.ofEpochMilli(200L)) + .setInputTimestamp(Instant.ofEpochMilli(firstTimestamp)) .build(); this.softly.assertThat(seq(processedDeadLetters).toList()) diff --git a/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java b/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java index ea2b925..6562eeb 100644 --- a/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java +++ b/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java @@ -29,7 +29,6 @@ import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_CLASS_NAME; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_MESSAGE; import static com.bakdata.kafka.ErrorHeaderProcessor.EXCEPTION_STACK_TRACE; -import static com.bakdata.kafka.ErrorHeaderProcessor.INPUT_TIMESTAMP; import static com.bakdata.kafka.ErrorHeaderProcessor.OFFSET; import static com.bakdata.kafka.ErrorHeaderProcessor.PARTITION; import static com.bakdata.kafka.ErrorHeaderProcessor.TOPIC; @@ -141,15 +140,14 @@ private static Headers generateDefaultHeaders() { .add(DESCRIPTION, toBytes("description")) .add(EXCEPTION_CLASS_NAME, toBytes("org.apache.kafka.connect.errors.DataException")) .add(EXCEPTION_MESSAGE, toBytes("my message")) - .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)) - .add(INPUT_TIMESTAMP, toBytes(200L)); + .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); } @Test void shouldConvert() { final Headers headers = generateDefaultHeaders(); - this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 0)) + this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 200L)) .satisfies(deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getPartition()).hasValue(1); @@ -191,14 +189,6 @@ void shouldConvertNullMessageHeader() { .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getCause().getMessage()).isNotPresent()); } - @Test - void shouldFallbackToRecordTimestamp() { - Headers headers = generateDefaultHeaders().remove(INPUT_TIMESTAMP); - this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 500)) - .satisfies(deadLetter -> this.softly.assertThat(deadLetter.getInputTimestamp()) - .hasValue(Instant.ofEpochMilli(500L))); - } - @ParameterizedTest @MethodSource("generateMissingRequiredHeaders") From 2dba5e9d9e65f278a87ebd54006e066b24468709 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 14:22:28 +0200 Subject: [PATCH 3/4] Adjust test --- .../com/bakdata/kafka/StreamsDeadLetterParserTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java b/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java index 6562eeb..8f7142f 100644 --- a/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java +++ b/src/test/java/com/bakdata/kafka/StreamsDeadLetterParserTest.java @@ -145,7 +145,14 @@ private static Headers generateDefaultHeaders() { @Test void shouldConvert() { - final Headers headers = generateDefaultHeaders(); + final Headers headers = new RecordHeaders() + .add(PARTITION, toBytes(1)) + .add(TOPIC, toBytes("my-topic")) + .add(OFFSET, toBytes(10L)) + .add(DESCRIPTION, toBytes("description")) + .add(EXCEPTION_CLASS_NAME, toBytes("org.apache.kafka.connect.errors.DataException")) + .add(EXCEPTION_MESSAGE, toBytes("my message")) + .add(EXCEPTION_STACK_TRACE, toBytes(StackTraceClassifierTest.STACK_TRACE)); this.softly.assertThat(new StreamsDeadLetterParser().convert("foo", headers, 200L)) .satisfies(deadLetter -> { From 5910afdb4ec3a51fd4797af52aee7c73bfc904de Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 14:38:05 +0200 Subject: [PATCH 4/4] Bump to release version --- build.gradle.kts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index c32f796..42eb5be 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -20,7 +20,6 @@ allprojects { repositories { mavenCentral() - maven(url = "https://s01.oss.sonatype.org/content/repositories/snapshots/") maven(url = "https://packages.confluent.io/maven/") } } @@ -53,7 +52,7 @@ dependencies { implementation(group = "com.bakdata.kafka", name = "brute-force-serde", version = "1.2.1") implementation(group = "com.bakdata.kafka", name = "large-message-serde", version = "2.7.0") implementation(group = "org.jooq", name = "jool", version = "0.9.14") - avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.4.5-SNAPSHOT") + avroApi(group = "com.bakdata.kafka", name = "error-handling-avro", version = "1.5.0") val junitVersion = "5.10.1" testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)