diff --git a/error-handling-avro/src/main/avro/DeadLetter.avsc b/error-handling-avro/src/main/avro/DeadLetter.avsc index b600309..d00f6c8 100644 --- a/error-handling-avro/src/main/avro/DeadLetter.avsc +++ b/error-handling-avro/src/main/avro/DeadLetter.avsc @@ -68,6 +68,17 @@ } ] } + }, + { + "name": "input_timestamp", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null } ] } diff --git a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java index 6103040..f916b65 100644 --- a/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java +++ b/error-handling-avro/src/main/java/com/bakdata/kafka/AvroDeadLetterConverter.java @@ -45,6 +45,7 @@ public DeadLetter convert(final DeadLetterDescription deadLetterDescription) { .setTopic(deadLetterDescription.getTopic()) .setPartition(deadLetterDescription.getPartition()) .setOffset(deadLetterDescription.getOffset()) + .setInputTimestamp(deadLetterDescription.getInputTimestamp()) .build(); } diff --git a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java index 65275b0..56b8d45 100644 --- a/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java +++ b/error-handling-avro/src/test/java/com/bakdata/kafka/AvroDeadLetterProcessorTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.when; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -88,11 +89,12 @@ protected Properties getKafkaProperties() { @Test void shouldConvertAndSerializeAvroDeadLetter() { + final long startTimestamp = System.currentTimeMillis(); when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE)); this.createTopology(); this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE) - .add(1, "foo") - .add(2, "bar"); + .add(1, "foo", 100) + .add(2, "bar", 200); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(STRING_SERDE)) @@ -106,6 +108,7 @@ void shouldConvertAndSerializeAvroDeadLetter() { this.softly.assertThat(errors) .hasSize(2) + .allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp)) .extracting(ProducerRecord::value).allSatisfy( deadLetter -> { this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION); @@ -123,12 +126,14 @@ void shouldConvertAndSerializeAvroDeadLetter() { deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("foo"); this.softly.assertThat(deadLetter.getOffset()).hasValue(0L); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(100)); } ); this.softly.assertThat(errors).extracting(ProducerRecord::value).element(1).satisfies( deadLetter -> { this.softly.assertThat(deadLetter.getInputValue()).hasValue("bar"); this.softly.assertThat(deadLetter.getOffset()).hasValue(1L); + this.softly.assertThat(deadLetter.getInputTimestamp()).hasValue(Instant.ofEpochMilli(200)); } ); diff --git a/error-handling-core/build.gradle.kts b/error-handling-core/build.gradle.kts index 7ddce14..96dd183 100644 --- a/error-handling-core/build.gradle.kts +++ b/error-handling-core/build.gradle.kts @@ -45,6 +45,7 @@ dependencies { val jacksonVersion: String by project testFixturesImplementation(group = "com.fasterxml.jackson.core", name = "jackson-core", version = jacksonVersion) testFixturesImplementation(group = "com.fasterxml.jackson.core", name = "jackson-databind", version = jacksonVersion) + testFixturesImplementation(group = "com.fasterxml.jackson.datatype", name = "jackson-datatype-jsr310", version = jacksonVersion) } avro { diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java index de78e85..a32b293 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterDescription.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.time.Instant; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -55,4 +56,5 @@ public static class Cause { String topic; Integer partition; Long offset; + Instant inputTimestamp; } diff --git a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java index 90b90e0..dfaf72a 100644 --- a/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java +++ b/error-handling-core/src/main/java/com/bakdata/kafka/DeadLetterProcessor.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import java.time.Instant; import java.util.Optional; import lombok.Getter; import lombok.NonNull; @@ -99,8 +100,14 @@ public void process(final FixedKeyRecord> inputRecord) { .topic(metadata.map(RecordMetadata::topic).orElse(null)) .partition(metadata.map(RecordMetadata::partition).orElse(null)) .offset(metadata.map(RecordMetadata::offset).orElse(null)) + .inputTimestamp(Instant.ofEpochMilli(inputRecord.timestamp())) .build(); - this.context.forward(inputRecord.withValue(this.deadLetterConverter.convert(deadLetterDescription))); + + final FixedKeyRecord record = inputRecord + .withValue(this.deadLetterConverter.convert(deadLetterDescription)) + .withTimestamp(this.context.currentSystemTimeMs()); + + this.context.forward(record); } @Override diff --git a/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java b/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java index a8f541c..fdc8785 100644 --- a/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java +++ b/error-handling-core/src/testFixtures/java/com/bakdata/kafka/TestDeadLetterSerde.java @@ -25,8 +25,8 @@ package com.bakdata.kafka; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import java.io.IOException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; @@ -34,7 +34,7 @@ import org.apache.kafka.common.serialization.Serializer; public class TestDeadLetterSerde implements Serde { - static final ObjectMapper objectMapper = new ObjectMapper(); + static final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()); private static final Serializer serializer = (topic, data) -> { try { return objectMapper.writeValueAsBytes(data); diff --git a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java index 61081fe..225a6b2 100644 --- a/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java +++ b/error-handling-proto/src/main/java/com/bakdata/kafka/ProtoDeadLetterConverter.java @@ -28,6 +28,7 @@ import com.google.protobuf.Int32Value; import com.google.protobuf.Int64Value; import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier; @@ -65,6 +66,15 @@ public ProtoDeadLetter convert(final DeadLetterDescription deadLetterDescription if (deadLetterDescription.getOffset() != null) { builder.setOffset(Int64Value.of(deadLetterDescription.getOffset())); } + + if (deadLetterDescription.getInputTimestamp() != null) { + final Timestamp timestamp = Timestamp.newBuilder() + .setSeconds(deadLetterDescription.getInputTimestamp().getEpochSecond()) + .setNanos(deadLetterDescription.getInputTimestamp().getNano()) + .build(); + builder.setInputTimestamp(timestamp); + } + return builder.build(); } diff --git a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto index 90f07b6..857a0e2 100644 --- a/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto +++ b/error-handling-proto/src/main/proto/bakdata/kafka/proto/v1/deadletter.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package bakdata.kafka.proto.v1; import "google/protobuf/wrappers.proto"; +import "google/protobuf/timestamp.proto"; option java_package = "com.bakdata.kafka.proto.v1"; option java_multiple_files = true; @@ -19,4 +20,5 @@ message ProtoDeadLetter { google.protobuf.StringValue topic = 4; google.protobuf.Int32Value partition = 5; google.protobuf.Int64Value offset = 6; + google.protobuf.Timestamp input_timestamp = 7; } diff --git a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java index 396c56d..d6a0293 100644 --- a/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java +++ b/error-handling-proto/src/test/java/com/bakdata/kafka/ProtoDeadLetterProcessorTest.java @@ -33,10 +33,12 @@ import com.google.protobuf.Int32Value; import com.google.protobuf.Int64Value; import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig; import io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -118,11 +120,12 @@ protected void createTopology() { @Test void shouldConvertAndSerializeProtoDeadLetter() { + final long startTimestamp = System.currentTimeMillis(); when(this.mapper.apply(any(), any())).thenThrow(new RuntimeException(ERROR_MESSAGE)); this.createTopology(); this.topology.input(INPUT_TOPIC).withValueSerde(STRING_SERDE) - .add(1, "foo") - .add(2, "bar"); + .add(1, "foo", 100) + .add(2, "bar", 200); final List> records = Seq.seq(this.topology.streamOutput(OUTPUT_TOPIC) .withValueSerde(STRING_SERDE)) @@ -136,6 +139,7 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(errors) .hasSize(2) + .allSatisfy(record -> this.softly.assertThat(record.timestamp()).isGreaterThan(startTimestamp)) .extracting(ProducerRecord::value).allSatisfy( deadLetter -> { this.softly.assertThat(deadLetter.getDescription()).isEqualTo(DEAD_LETTER_DESCRIPTION); @@ -156,6 +160,8 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("foo"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(0L); + this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp())) + .isEqualTo(Instant.ofEpochMilli(100)); } ); this.softly.assertThat(errors).map(ProducerRecord::value).element(1).satisfies( @@ -163,8 +169,13 @@ void shouldConvertAndSerializeProtoDeadLetter() { this.softly.assertThat(deadLetter.getInputValue()).extracting(StringValue::getValue) .isEqualTo("bar"); this.softly.assertThat(deadLetter.getOffset()).extracting(Int64Value::getValue).isEqualTo(1L); + this.softly.assertThat(timestampToInstant(deadLetter.getInputTimestamp())) + .isEqualTo(Instant.ofEpochMilli(200)); } ); + } + private static Instant timestampToInstant(final Timestamp timestamp) { + return Instant.ofEpochSecond(timestamp.getSeconds(), timestamp.getNanos()); } }