From 0f067001b107c089081407dcc8f94d3bec3041fd Mon Sep 17 00:00:00 2001 From: Anatoly Zelenin Date: Mon, 22 Jun 2020 21:28:14 +0200 Subject: [PATCH] Documented hacky workaround with timestampType -2 --- docs/Kafka_Backup_Architecture.md | 2 +- .../azapps/kafkabackup/common/record/RecordSerde.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/docs/Kafka_Backup_Architecture.md b/docs/Kafka_Backup_Architecture.md index d0fed41..df78866 100644 --- a/docs/Kafka_Backup_Architecture.md +++ b/docs/Kafka_Backup_Architecture.md @@ -194,7 +194,7 @@ Each entry is of the following form: | Length (in bits) | Name | Data Type | Comment | |------------------|-----------------|-------------------|--------------------------------------------------------------------------------| | 64 | `offset` | `int64` | The offset of the record in the source Kafka cluster | -| 32 | `timestampType` | `int32` | Type of the timestamp: `-1`: no timestamp, `0`: CreateTime, `1`: LogAppendTime | +| 32 | `timestampType` | `int32` | Type of the timestamp: `-1`: no timestamp, `0`: CreateTime, `1`: LogAppendTime, `-2`: CreateTime but with Timestamp `null` (dirty workaround regarding https://github.com/itadventurer/kafka-backup/issues/92) | | 0 or 64 | `timestamp` | `optional` | Timestamp if exists | | 32 | `keyLength` | `int32` | byte-length of the record key `-1` if the key is `null` | | `keyLength` | `key` | `byte[]` | key (not interpreted in any way) | diff --git a/src/main/java/de/azapps/kafkabackup/common/record/RecordSerde.java b/src/main/java/de/azapps/kafkabackup/common/record/RecordSerde.java index e5b2f9b..e6d3626 100644 --- a/src/main/java/de/azapps/kafkabackup/common/record/RecordSerde.java +++ b/src/main/java/de/azapps/kafkabackup/common/record/RecordSerde.java @@ -10,8 +10,8 @@ /** * Record Format: * offset: int64 - * timestampType: int32 - * [timestamp: int64] if timestampType != NO_TIMESTAMP_TYPE + * timestampType: int32 -2 if timestamp is null + * [timestamp: int64] if timestampType != NO_TIMESTAMP_TYPE && timestamp != null * keyLength: int32 * [key: byte[keyLength]] if keyLength >= 0 * valueLength: int32 @@ -32,6 +32,7 @@ public static Record read(String topic, int partition, InputStream inputStream) int timestampTypeInt = dataStream.readInt(); TimestampType timestampType; Long timestamp; + // See comment in `write()` if (timestampTypeInt == -2) { timestampType = TimestampType.CREATE_TIME; timestamp=null; @@ -107,6 +108,11 @@ public static Record read(String topic, int partition, InputStream inputStream) public static void write(OutputStream outputStream, Record record) throws IOException { DataOutputStream dataStream = new DataOutputStream(outputStream); dataStream.writeLong(record.kafkaOffset()); + // There is a special case where the timestamp type eqauls `CREATE_TIME` but is actually `null`. + // This should not happen normally and I see it as a bug in the Client implementation of pykafka + // But as Kafka accepts that value, so should Kafka Backup. Thus, this dirty workaround: we write the + // timestamp type `-2` if the type is CREATE_TIME but the timestamp itself is null. Otherwise we would have + // needed to change the byte format and for now I think this is the better solution. if (record.timestampType() == TimestampType.CREATE_TIME && record.timestamp() == null) { dataStream.writeInt(-2); } else {