diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java index 8adc0bb..34403b1 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java @@ -77,10 +77,6 @@ public class ErrorHeaderProcessor implements FixedKeyProcessor> inputRecord) { addHeader(EXCEPTION_MESSAGE, value.getThrowable().getMessage(), headers); addHeader(EXCEPTION_STACK_TRACE, ExceptionUtils.getStackTrace(value.getThrowable()), headers); addHeader(DESCRIPTION, this.description, headers); - addHeader(INPUT_TIMESTAMP, Long.toString(inputRecord.timestamp()), headers); - this.context.forward(inputRecord.withValue(value.getValue()).withTimestamp(this.context.currentSystemTimeMs())); + this.context.forward(inputRecord.withValue(value.getValue())); } @Override diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java index 68d38f2..e1da34c 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java @@ -182,37 +182,4 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) { .isNull()); } - @Test - void shouldSetTimestamp(final SoftAssertions softly) { - final long timestamp = System.currentTimeMillis(); - when(this.mapper.apply(1, "foo")).thenThrow(new RuntimeException()); - this.createTopology(); - this.topology.input() - .withValueSerde(STRING_SERDE) - .add(1, "foo", 100); - - final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) - .withValueSerde(LONG_SERDE)) - .toList(); - softly.assertThat(records).isEmpty(); - final List> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC) - .withValueSerde(Serdes.String())) - .toList(); - - softly.assertThat(errors) - .hasSize(1) - .first() - .isNotNull() - .satisfies(record -> { - softly.assertThat(record.key()).isEqualTo(1); - softly.assertThat(record.value()).isEqualTo("foo"); - softly.assertThat(record.timestamp()).isGreaterThan(timestamp); - }) - .extracting(ProducerRecord::headers) - .satisfies(headers -> softly.assertThat(getHeader(headers, ErrorHeaderProcessor.INPUT_TIMESTAMP)) - .isEqualTo("100")); - } - - }