Skip to content

Commit

Permalink
Documented hacky workaround with timestampType -2
Browse files Browse the repository at this point in the history
  • Loading branch information
itadventurer committed Jun 22, 2020
1 parent 776888e commit 0f06700
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/Kafka_Backup_Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Each entry is of the following form:
| Length (in bits) | Name | Data Type | Comment |
|------------------|-----------------|-------------------|--------------------------------------------------------------------------------|
| 64 | `offset` | `int64` | The offset of the record in the source Kafka cluster |
| 32 | `timestampType` | `int32` | Type of the timestamp: `-1`: no timestamp, `0`: CreateTime, `1`: LogAppendTime |
| 32 | `timestampType` | `int32` | Type of the timestamp: `-1`: no timestamp, `0`: CreateTime, `1`: LogAppendTime, `-2`: CreateTime but with Timestamp `null` (dirty workaround regarding https://github.com/itadventurer/kafka-backup/issues/92) |
| 0 or 64 | `timestamp` | `optional<int64>` | Timestamp if exists |
| 32 | `keyLength` | `int32` | byte-length of the record key `-1` if the key is `null` |
| `keyLength` | `key` | `byte[]` | key (not interpreted in any way) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
/**
* Record Format:
* offset: int64
* timestampType: int32
* [timestamp: int64] if timestampType != NO_TIMESTAMP_TYPE
* timestampType: int32 -2 if timestamp is null
* [timestamp: int64] if timestampType != NO_TIMESTAMP_TYPE && timestamp != null
* keyLength: int32
* [key: byte[keyLength]] if keyLength >= 0
* valueLength: int32
Expand All @@ -32,6 +32,7 @@ public static Record read(String topic, int partition, InputStream inputStream)
int timestampTypeInt = dataStream.readInt();
TimestampType timestampType;
Long timestamp;
// See comment in `write()`
if (timestampTypeInt == -2) {
timestampType = TimestampType.CREATE_TIME;
timestamp=null;
Expand Down Expand Up @@ -107,6 +108,11 @@ public static Record read(String topic, int partition, InputStream inputStream)
public static void write(OutputStream outputStream, Record record) throws IOException {
DataOutputStream dataStream = new DataOutputStream(outputStream);
dataStream.writeLong(record.kafkaOffset());
// There is a special case where the timestamp type eqauls `CREATE_TIME` but is actually `null`.
// This should not happen normally and I see it as a bug in the Client implementation of pykafka
// But as Kafka accepts that value, so should Kafka Backup. Thus, this dirty workaround: we write the
// timestamp type `-2` if the type is CREATE_TIME but the timestamp itself is null. Otherwise we would have
// needed to change the byte format and for now I think this is the better solution.
if (record.timestampType() == TimestampType.CREATE_TIME && record.timestamp() == null) {
dataStream.writeInt(-2);
} else {
Expand Down

0 comments on commit 0f06700

Please sign in to comment.