From d534c63dd10602b80caca26ef85768227e3a3c86 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Thu, 20 Jun 2024 21:51:12 +0200 Subject: [PATCH 1/4] Use wall-clock time for daed letter record The written dead-letter now has the wall-clock time as its record's timestamp. Because the original timestamp would be lost otherwise, there is a new field in the respective dead-letter models called timestamp. To maintain backwards compatibility, it is nullable, but always set starting from this version. --- error-handling-avro/src/main/avro/DeadLetter.avsc | 11 +++++++++++ .../bakdata/kafka/AvroDeadLetterConverter.java | 2 ++ .../kafka/AvroDeadLetterProcessorTest.java | 9 +++++++-- error-handling-core/build.gradle.kts | 1 + .../com/bakdata/kafka/DeadLetterDescription.java | 2 ++ .../com/bakdata/kafka/DeadLetterProcessor.java | 9 ++++++++- .../com/bakdata/kafka/TestDeadLetterSerde.java | 4 ++-- .../bakdata/kafka/ProtoDeadLetterConverter.java | 11 +++++++++++ .../proto/bakdata/kafka/proto/v1/deadletter.proto | 2 ++ .../kafka/ProtoDeadLetterProcessorTest.java | 15 +++++++++++++-- 10 files changed, 59 insertions(+), 7 deletions(-) diff --git a/error-handling-avro/src/main/avro/DeadLetter.avsc b/error-handling-avro/src/main/avro/DeadLetter.avsc index b600309..790d313 100644 --- a/error-handling-avro/src/main/avro/DeadLetter.avsc +++ b/error-handling-avro/src/main/avro/DeadLetter.avsc @@ -38,6 +38,17 @@ "name": "description", "type": "string" }, + { + "name": "timestamp", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null + }, { "name": "cause", "type": { diff --git a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java index 6103040..24b4012 100644 --- a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java +++ b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.time.Instant; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -45,6 +46,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) { .setTopic(deadLetterDescription.getTopic()) .setPartition(deadLetterDescription.getPartition()) .setOffset(deadLetterDescription.getOffset()) + .setTimestamp(deadLetterDescription.getTimestamp()) .build(); } diff --git a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java index 65275b0..e554de9 100644 --- a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java +++ b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -88,11 +89,12 @@ protected Properties getKafkaProperties() { @Test void shouldConvertAndSerializeAvroDeadLetter() { + final long startTimestamp = System.currentTimeMillis(); when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE)); this.createTopology(); this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE) - .add(1, "foo") - .add(2, "bar"); + .add(1, "foo", 100) + .add(2, "bar", 200); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(STRING_SERDE)) @@ -106,6 +108,7 @@ void shouldConvertAndSerializeAvroDeadLetter() { this.softly.assertThat(errors) .hasSize(2) + .allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp)) .extracting(ProducerRecord::value).allSatisfy( deadLetter -> { this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION); @@ -123,12 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() { deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getOffset()).hasValue(0L); + this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(100)); } ); this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies( deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar"); this.softly.assertThat(deadLetter.getOffset()).hasValue(1L); + this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(200)); } ); diff --git a/error-handling-core/build.gradle.kts b/error-handling-core/build.gradle.kts index 7ddce14..96dd183 100644 --- a/error-handling-core/build.gradle.kts +++ b/error-handling-core/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { val jacksonVersion: String by project testFixturesImplementation(group = "com.fasterxml.jackson.core", name = "jackson-core", version = jacksonVersion) testFixturesImplementation(group = "com.fasterxml.jackson.core", name = "jackson-databind", version = jacksonVersion) + testFixturesImplementation(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310", version = jacksonVersion) } avro { diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java index de78e85..68711e3 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.time.Instant; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -55,4 +56,5 @@ public static class Cause { String topic; Integer partition; Long offset; + Instant timestamp; } diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java index 90b90e0..88a151f 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.time.Instant; import java.util.Optional; import lombok.Getter; import lombok.NonNull; @@ -99,8 +100,14 @@ public void process(final FixedKeyRecord> inputRecord) { .topic(metadata.map(RecordMetadata::topic).orElse(null)) .partition(metadata.map(RecordMetadata::partition).orElse(null)) .offset(metadata.map(RecordMetadata::offset).orElse(null)) + .timestamp(Instant.ofEpochMilli(inputRecord.timestamp())) .build(); - this.context.forward(inputRecord.withValue(this.deadLetterConverter.convert(deadLetterDescription))); + + final FixedKeyRecord record = inputRecord + .withValue(this.deadLetterConverter.convert(deadLetterDescription)) + .withTimestamp(this.context.currentSystemTimeMs()); + + this.context.forward(record); } @Override diff --git a/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java b/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java index a8f541c..fdc8785 100644 --- a/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java +++ b/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java @@ -25,8 +25,8 @@ package com.bakdata.kafka; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import java.io.IOException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; @@ -34,7 +34,7 @@ import org.apache.kafka.common.serialization.Serializer; public class TestDeadLetterSerde implements Serde { - static final ObjectMapper objectMapper = new ObjectMapper(); + static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); private static final Serializer serializer = (topic, data) -> { try { return objectMapper.writeValueAsBytes(data); diff --git a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java index 61081fe..0c89822 100644 --- a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java +++ b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java @@ -28,6 +28,8 @@ import com.google.protobuf.Int32Value; import com.google.protobuf.Int64Value; import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import java.time.Instant; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -65,6 +67,15 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription if (deadLetterDescription.getOffset() != null) { builder.setOffset(Int64Value.of(deadLetterDescription.getOffset())); } + + if (deadLetterDescription.getTimestamp() != null) { + final Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(deadLetterDescription.getTimestamp().getEpochSecond()) + .setNanos(deadLetterDescription.getTimestamp().getNano()) + .build(); + builder.setTimestamp(timestamp); + } + return builder.build(); } diff --git a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto index 90f07b6..29698b1 100644 --- a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto +++ b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package bakdata.kafka.proto.v1; import "google/protobuf/wrappers.proto"; +import "google/protobuf/timestamp.proto"; option java_package = "com.bakdata.kafka.proto.v1"; option java_multiple_files = true; @@ -19,4 +20,5 @@ message ProtoDeadLetter { google.protobuf.StringValue topic = 4; google.protobuf.Int32Value partition = 5; google.protobuf.Int64Value offset = 6; + google.protobuf.Timestamp timestamp = 7; } diff --git a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java index 396c56d..ae16259 100644 --- a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java +++ b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java @@ -33,10 +33,12 @@ import com.google.protobuf.Int32Value; import com.google.protobuf.Int64Value; import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -118,11 +120,12 @@ protected void createTopology() { @Test void shouldConvertAndSerializeProtoDeadLetter() { + final long startTimestamp = System.currentTimeMillis(); when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE)); this.createTopology(); this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE) - .add(1, "foo") - .add(2, "bar"); + .add(1, "foo", 100) + .add(2, "bar", 200); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(STRING_SERDE)) @@ -136,6 +139,7 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(errors) .hasSize(2) + .allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp)) .extracting(ProducerRecord::value).allSatisfy( deadLetter -> { this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION); @@ -156,6 +160,8 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("foo"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L); + this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp())) + .isEqualTo(Instant.ofEpochMilli(100)); } ); this.softly.assertThat(errors).map(ProducerRecord::value).element(1).satisfies( @@ -163,8 +169,13 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("bar"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L); + this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp())) + .isEqualTo(Instant.ofEpochMilli(200)); } ); + } + private static Instant timestampToInstant(final Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()); } } From 8b0d7fc0a2ef4ef5f33950699d9fca76ef7c4c5d Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Fri, 21 Jun 2024 10:55:58 +0200 Subject: [PATCH 2/4] Rename timestamp field --- error-handling-avro/src/main/avro/DeadLetter.avsc | 2 +- .../java/com/bakdata/kafka/AvroDeadLetterConverter.java | 3 +-- .../com/bakdata/kafka/AvroDeadLetterProcessorTest.java | 4 ++-- .../java/com/bakdata/kafka/DeadLetterDescription.java | 2 +- .../main/java/com/bakdata/kafka/DeadLetterProcessor.java | 2 +- .../java/com/bakdata/kafka/ProtoDeadLetterConverter.java | 9 ++++----- .../main/proto/bakdata/kafka/proto/v1/deadletter.proto | 2 +- .../com/bakdata/kafka/ProtoDeadLetterProcessorTest.java | 4 ++-- 8 files changed, 13 insertions(+), 15 deletions(-) diff --git a/error-handling-avro/src/main/avro/DeadLetter.avsc b/error-handling-avro/src/main/avro/DeadLetter.avsc index 790d313..7304756 100644 --- a/error-handling-avro/src/main/avro/DeadLetter.avsc +++ b/error-handling-avro/src/main/avro/DeadLetter.avsc @@ -39,7 +39,7 @@ "type": "string" }, { - "name": "timestamp", + "name": "input_timestamp", "type": [ "null", { diff --git a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java index 24b4012..f916b65 100644 --- a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java +++ b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import java.time.Instant; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -46,7 +45,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) { .setTopic(deadLetterDescription.getTopic()) .setPartition(deadLetterDescription.getPartition()) .setOffset(deadLetterDescription.getOffset()) - .setTimestamp(deadLetterDescription.getTimestamp()) + .setInputTimestamp(deadLetterDescription.getInputTimestamp()) .build(); } diff --git a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java index e554de9..56b8d45 100644 --- a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java +++ b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java @@ -126,14 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() { deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getOffset()).hasValue(0L); - this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(100)); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(100)); } ); this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies( deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar"); this.softly.assertThat(deadLetter.getOffset()).hasValue(1L); - this.softly.assertThat(deadLetter.getTimestamp()).hasValue(Instant.ofEpochMilli(200)); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200)); } ); diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java index 68711e3..a32b293 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java @@ -56,5 +56,5 @@ public static class Cause { String topic; Integer partition; Long offset; - Instant timestamp; + Instant inputTimestamp; } diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java index 88a151f..dfaf72a 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java @@ -100,7 +100,7 @@ public void process(final FixedKeyRecord> inputRecord) { .topic(metadata.map(RecordMetadata::topic).orElse(null)) .partition(metadata.map(RecordMetadata::partition).orElse(null)) .offset(metadata.map(RecordMetadata::offset).orElse(null)) - .timestamp(Instant.ofEpochMilli(inputRecord.timestamp())) + .inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp())) .build(); final FixedKeyRecord record = inputRecord diff --git a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java index 0c89822..225a6b2 100644 --- a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java +++ b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java @@ -29,7 +29,6 @@ import com.google.protobuf.Int64Value; import com.google.protobuf.StringValue; import com.google.protobuf.Timestamp; -import java.time.Instant; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -68,12 +67,12 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription builder.setOffset(Int64Value.of(deadLetterDescription.getOffset())); } - if (deadLetterDescription.getTimestamp() != null) { + if (deadLetterDescription.getInputTimestamp() != null) { final Timestamp timestamp = Timestamp.newBuilder() - .setSeconds(deadLetterDescription.getTimestamp().getEpochSecond()) - .setNanos(deadLetterDescription.getTimestamp().getNano()) + .setSeconds(deadLetterDescription.getInputTimestamp().getEpochSecond()) + .setNanos(deadLetterDescription.getInputTimestamp().getNano()) .build(); - builder.setTimestamp(timestamp); + builder.setInputTimestamp(timestamp); } return builder.build(); diff --git a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto index 29698b1..857a0e2 100644 --- a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto +++ b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto @@ -20,5 +20,5 @@ message ProtoDeadLetter { google.protobuf.StringValue topic = 4; google.protobuf.Int32Value partition = 5; google.protobuf.Int64Value offset = 6; - google.protobuf.Timestamp timestamp = 7; + google.protobuf.Timestamp input_timestamp = 7; } diff --git a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java index ae16259..d6a0293 100644 --- a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java +++ b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java @@ -160,7 +160,7 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("foo"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L); - this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp())) + this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp())) .isEqualTo(Instant.ofEpochMilli(100)); } ); @@ -169,7 +169,7 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("bar"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L); - this.softly.assertThat(timestampToInstant(deadLetter.getTimestamp())) + this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp())) .isEqualTo(Instant.ofEpochMilli(200)); } ); From 617a080a03accbed49ab2b835c5e2e2ff01529ce Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 07:19:00 +0200 Subject: [PATCH 3/4] Adjust ErrorHeaderProcessor --- .../src/main/avro/DeadLetter.avsc | 22 ++++++------- .../bakdata/kafka/ErrorHeaderProcessor.java | 7 +++- .../ErrorHeaderProcessorTopologyTest.java | 33 +++++++++++++++++++ 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/error-handling-avro/src/main/avro/DeadLetter.avsc b/error-handling-avro/src/main/avro/DeadLetter.avsc index 7304756..d00f6c8 100644 --- a/error-handling-avro/src/main/avro/DeadLetter.avsc +++ b/error-handling-avro/src/main/avro/DeadLetter.avsc @@ -38,17 +38,6 @@ "name": "description", "type": "string" }, - { - "name": "input_timestamp", - "type": [ - "null", - { - "type": "long", - "logicalType": "timestamp-millis" - } - ], - "default": null - }, { "name": "cause", "type": { @@ -79,6 +68,17 @@ } ] } + }, + { + "name": "input_timestamp", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null } ] } diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java index 34403b1..8adc0bb 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java @@ -77,6 +77,10 @@ public class ErrorHeaderProcessor implements FixedKeyProcessor> inputRecord) { addHeader(EXCEPTION_MESSAGE, value.getThrowable().getMessage(), headers); addHeader(EXCEPTION_STACK_TRACE, ExceptionUtils.getStackTrace(value.getThrowable()), headers); addHeader(DESCRIPTION, this.description, headers); - this.context.forward(inputRecord.withValue(value.getValue())); + addHeader(INPUT_TIMESTAMP, Long.toString(inputRecord.timestamp()), headers); + this.context.forward(inputRecord.withValue(value.getValue()).withTimestamp(this.context.currentSystemTimeMs())); } @Override diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java index e1da34c..68d38f2 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java @@ -182,4 +182,37 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) { .isNull()); } + @Test + void shouldSetTimestamp(final SoftAssertions softly) { + final long timestamp = System.currentTimeMillis(); + when(this.mapper.apply(1, "foo")).thenThrow(new RuntimeException()); + this.createTopology(); + this.topology.input() + .withValueSerde(STRING_SERDE) + .add(1, "foo", 100); + + final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) + .withKeySerde(DOUBLE_SERDE) + .withValueSerde(LONG_SERDE)) + .toList(); + softly.assertThat(records).isEmpty(); + final List> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC) + .withValueSerde(Serdes.String())) + .toList(); + + softly.assertThat(errors) + .hasSize(1) + .first() + .isNotNull() + .satisfies(record -> { + softly.assertThat(record.key()).isEqualTo(1); + softly.assertThat(record.value()).isEqualTo("foo"); + softly.assertThat(record.timestamp()).isGreaterThan(timestamp); + }) + .extracting(ProducerRecord::headers) + .satisfies(headers -> softly.assertThat(getHeader(headers, ErrorHeaderProcessor.INPUT_TIMESTAMP)) + .isEqualTo("100")); + } + + } From a42bf3b303e3c99913e690e63c897bf3b6c54386 Mon Sep 17 00:00:00 2001 From: Torben Meyer Date: Mon, 24 Jun 2024 11:21:00 +0200 Subject: [PATCH 4/4] Revert "AdjustErrrorHeaderProcessor" --- .../bakdata/kafka/ErrorHeaderProcessor.java | 7 +--- .../ErrorHeaderProcessorTopologyTest.java | 33 ------------------- 2 files changed, 1 insertion(+), 39 deletions(-) diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java index 8adc0bb..34403b1 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/ErrorHeaderProcessor.java @@ -77,10 +77,6 @@ public class ErrorHeaderProcessor implements FixedKeyProcessor> inputRecord) { addHeader(EXCEPTION_MESSAGE, value.getThrowable().getMessage(), headers); addHeader(EXCEPTION_STACK_TRACE, ExceptionUtils.getStackTrace(value.getThrowable()), headers); addHeader(DESCRIPTION, this.description, headers); - addHeader(INPUT_TIMESTAMP, Long.toString(inputRecord.timestamp()), headers); - this.context.forward(inputRecord.withValue(value.getValue()).withTimestamp(this.context.currentSystemTimeMs())); + this.context.forward(inputRecord.withValue(value.getValue())); } @Override diff --git a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java index 68d38f2..e1da34c 100644 --- a/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java +++ b/error-handling-core/src/test/java/com/bakdata/kafka/ErrorHeaderProcessorTopologyTest.java @@ -182,37 +182,4 @@ void shouldHandleExceptionWithoutMessage(final SoftAssertions softly) { .isNull()); } - @Test - void shouldSetTimestamp(final SoftAssertions softly) { - final long timestamp = System.currentTimeMillis(); - when(this.mapper.apply(1, "foo")).thenThrow(new RuntimeException()); - this.createTopology(); - this.topology.input() - .withValueSerde(STRING_SERDE) - .add(1, "foo", 100); - - final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) - .withKeySerde(DOUBLE_SERDE) - .withValueSerde(LONG_SERDE)) - .toList(); - softly.assertThat(records).isEmpty(); - final List> errors = Seq.seq(this.topology.streamOutput(ERROR_TOPIC) - .withValueSerde(Serdes.String())) - .toList(); - - softly.assertThat(errors) - .hasSize(1) - .first() - .isNotNull() - .satisfies(record -> { - softly.assertThat(record.key()).isEqualTo(1); - softly.assertThat(record.value()).isEqualTo("foo"); - softly.assertThat(record.timestamp()).isGreaterThan(timestamp); - }) - .extracting(ProducerRecord::headers) - .satisfies(headers -> softly.assertThat(getHeader(headers, ErrorHeaderProcessor.INPUT_TIMESTAMP)) - .isEqualTo("100")); - } - - }