Skip to content

Commit

Permalink
Merge pull request #98 from itadventurer/reproduce92
Browse files Browse the repository at this point in the history
Fix #92: Problems with weird null-timestamps
  • Loading branch information
itadventurer authored Jun 23, 2020
2 parents f157f83 + 0f06700 commit be35886
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docs/Kafka_Backup_Architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Each entry is of the following form:
| Length (in bits) | Name | Data Type | Comment |
|------------------|-----------------|-------------------|--------------------------------------------------------------------------------|
| 64 | `offset` | `int64` | The offset of the record in the source Kafka cluster |
| 32 | `timestampType` | `int32` | Type of the timestamp: `-1`: no timestamp, `0`: CreateTime, `1`: LogAppendTime |
| 32 | `timestampType` | `int32` | Type of the timestamp: `-1`: no timestamp, `0`: CreateTime, `1`: LogAppendTime, `-2`: CreateTime but with Timestamp `null` (dirty workaround regarding https://github.com/itadventurer/kafka-backup/issues/92) |
| 0 or 64 | `timestamp` | `optional<int64>` | Timestamp if exists |
| 32 | `keyLength` | `int32` | byte-length of the record key `-1` if the key is `null` |
| `keyLength` | `key` | `byte[]` | key (not interpreted in any way) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,44 @@
import java.text.SimpleDateFormat;

public class ListRecordFormatter extends RecordFormatter {
private DateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private final DateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public ListRecordFormatter(ByteFormatter keyFormatter, ByteFormatter valueFormatter) {
super(keyFormatter, valueFormatter);
}

@Override
public void writeTo(Record record, PrintStream outputStream) {
outputStream.println("Offset: " + record.kafkaOffset()
+ " Key: " + keyFormatter.toString(record.key())
+ " Timestamp: " + timestampFormat.format(record.timestamp())
+ " Data Length: " + record.value().length);
String offset = "Offset: " + record.kafkaOffset();
String key;
if (record.key() == null) {
key = "NULL Key";
} else {
key = "Key: " + keyFormatter.toString(record.key());
}
String timestamp = "Timestamp: ";
System.out.println(record);

switch (record.timestampType()) {
case NO_TIMESTAMP_TYPE:
timestamp += "No Timestamp";
break;
case CREATE_TIME:
timestamp += "(create)";
timestamp += timestampFormat.format(record.timestamp());
break;
case LOG_APPEND_TIME:
timestamp += "(log append)";
timestamp += timestampFormat.format(record.timestamp());
break;
}
String data_length;
if (record.value() == null) {
data_length = "NULL Value";
} else {
data_length = "Data: " + valueFormatter.toString(record.value());
}

outputStream.println(offset + " " + key + " " + timestamp + " " + data_length);
}
}
57 changes: 36 additions & 21 deletions src/main/java/de/azapps/kafkabackup/common/record/RecordSerde.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
/**
* Record Format:
* offset: int64
* timestampType: int32
* [timestamp: int64] if timestampType != NO_TIMESTAMP_TYPE
* timestampType: int32 -2 if timestamp is null
* [timestamp: int64] if timestampType != NO_TIMESTAMP_TYPE && timestamp != null
* keyLength: int32
* [key: byte[keyLength]] if keyLength >= 0
* valueLength: int32
Expand All @@ -31,24 +31,30 @@ public static Record read(String topic, int partition, InputStream inputStream)
long offset = dataStream.readLong();
int timestampTypeInt = dataStream.readInt();
TimestampType timestampType;
switch (timestampTypeInt) {
case -1:
timestampType = TimestampType.NO_TIMESTAMP_TYPE;
break;
case 0:
timestampType = TimestampType.CREATE_TIME;
break;
case 1:
timestampType = TimestampType.LOG_APPEND_TIME;
break;
default:
throw new RuntimeException("Unexpected TimestampType. Expected -1,0 or 1. Got " + timestampTypeInt);
}
Long timestamp;
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
timestamp = dataStream.readLong();
// See comment in `write()`
if (timestampTypeInt == -2) {
timestampType = TimestampType.CREATE_TIME;
timestamp=null;
} else {
timestamp = null;
switch (timestampTypeInt) {
case -1:
timestampType = TimestampType.NO_TIMESTAMP_TYPE;
break;
case 0:
timestampType = TimestampType.CREATE_TIME;
break;
case 1:
timestampType = TimestampType.LOG_APPEND_TIME;
break;
default:
throw new RuntimeException("Unexpected TimestampType. Expected -1,0 or 1. Got " + timestampTypeInt);
}
if (timestampType != TimestampType.NO_TIMESTAMP_TYPE) {
timestamp = dataStream.readLong();
} else {
timestamp = null;
}
}
int keyLength = dataStream.readInt();
byte[] key = null;
Expand Down Expand Up @@ -102,9 +108,18 @@ public static Record read(String topic, int partition, InputStream inputStream)
public static void write(OutputStream outputStream, Record record) throws IOException {
DataOutputStream dataStream = new DataOutputStream(outputStream);
dataStream.writeLong(record.kafkaOffset());
dataStream.writeInt(record.timestampType().id);
if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
dataStream.writeLong(record.timestamp());
// There is a special case where the timestamp type eqauls `CREATE_TIME` but is actually `null`.
// This should not happen normally and I see it as a bug in the Client implementation of pykafka
// But as Kafka accepts that value, so should Kafka Backup. Thus, this dirty workaround: we write the
// timestamp type `-2` if the type is CREATE_TIME but the timestamp itself is null. Otherwise we would have
// needed to change the byte format and for now I think this is the better solution.
if (record.timestampType() == TimestampType.CREATE_TIME && record.timestamp() == null) {
dataStream.writeInt(-2);
} else {
dataStream.writeInt(record.timestampType().id);
if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) {
dataStream.writeLong(record.timestamp());
}
}
if (record.key() != null) {
dataStream.writeInt(record.key().length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,21 @@ public class RecordSerdeTest {
private static final long OFFSET = 123;
private static final byte[] KEY_BYTES = "test-key".getBytes(StandardCharsets.UTF_8);
private static final byte[] VALUE_BYTES = "test-value".getBytes(StandardCharsets.UTF_8);
private static final byte[] NULL_TIMESTAMP_BYTES = "null-timestamp".getBytes(StandardCharsets.UTF_8);

private static final String SIMPLE_RECORD_FILE = "simple_record";
private static final String NULL_RECORD_FILE = "null_record";
private static final String EMPTY_RECORD_FILE = "empty_record";
private static final String HEADER_RECORD_FILE = "header_record";

// Example records
private static final Record SIMPLE_RECORD, NULL_RECORD, EMPTY_RECORD, HEADER_RECORD;
private static final Record SIMPLE_RECORD, NULL_RECORD, EMPTY_RECORD, HEADER_RECORD, NULL_TIMESTAMP_RECORD;

static {
SIMPLE_RECORD = new Record(TOPIC, PARTITION, KEY_BYTES, VALUE_BYTES, OFFSET);
NULL_RECORD = new Record(TOPIC, PARTITION, null, null, OFFSET);
EMPTY_RECORD = new Record(TOPIC, PARTITION, new byte[0], new byte[0], OFFSET);
NULL_TIMESTAMP_RECORD = new Record(TOPIC, PARTITION, NULL_TIMESTAMP_BYTES, null, OFFSET, null, TimestampType.CREATE_TIME);
// Build multiple headers that might cause problems
RecordHeaders headers = new RecordHeaders();
headers.add("", new byte[0]);
Expand All @@ -56,6 +58,12 @@ public void roundtripWithNull() throws Exception {
assertNotEquals(nullRoundtrip, emptyRoundtrip);
}

@Test
public void roundtripNullTimestamp() throws Exception {
Record nullTimestampRoundtrip =writeAndReadRecord(NULL_TIMESTAMP_RECORD);
assertEquals(NULL_TIMESTAMP_RECORD, nullTimestampRoundtrip);
}

@Test
public void roundtripHeaders() throws Exception {
Record headerRoundtrip = writeAndReadRecord(HEADER_RECORD);
Expand Down
2 changes: 2 additions & 0 deletions system_test/01_simple_roundtrip_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@
command: docker-compose exec -T restore-to-kafka bash -c '
utils.py consume_verify_weird_messages --partition 0 --topic backup-test-weird-msgs'
- name: Count Messages
timeout: 30s
command: docker-compose exec -T restore-to-kafka
utils.py count_messages
stdout_has:
Expand Down Expand Up @@ -155,3 +156,4 @@
entries:
- name: Docker Compose Down
command: docker-compose down
timeout: 15s
2 changes: 1 addition & 1 deletion system_test/utils/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN mkdir /usr/src && cd /usr/src/ && \

# Install confluent-kafka python

RUN pip3 install confluent-kafka==1.3.0
RUN pip3 install confluent-kafka==1.3.0 pykafka==2.8.0dev1
COPY utils.py /usr/bin/utils.py
6 changes: 6 additions & 0 deletions system_test/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from confluent_kafka import Producer, Consumer, TopicPartition, Message
from confluent_kafka.admin import AdminClient, NewTopic
import pykafka


def create_topic(topic, partitions, bootstrap_servers):
Expand Down Expand Up @@ -96,6 +97,11 @@ def produce_weird_messages(topic, partition, bootstrap_servers):
else:
producer.produce(topic, msg['value'], key=msg['key'], partition=partition)
producer.flush()
# pykafka for weird timestamps
pykafkaClient = pykafka.KafkaClient(hosts=bootstrap_servers)
pykafkaTopic = pykafkaClient.topics[topic]
with pykafkaTopic.get_sync_producer() as pykafkaProducer:
pykafkaProducer.produce(b"noneTimestamp")


def check_msg_equality(msgnum, expected, given: Message, expectFail=False):
Expand Down

0 comments on commit be35886

Please sign in to comment.