We describe here only the entities themself. See below for more information about the file formats.
A Kafka Backup Partition represents exactly one Kafka Partition. As partitions can grow very large in size, Kafka Backup splits the data in the partitions to segments (similar to Kafka) and rotates the segments after a certain (configurable) threshold size is met. For faster access of the segments, there is exactly one index per partition: It maps segment start offsets to their file names. This is only a performance optimization. The index does not need to be backed up – it is possible to restore the index given the segments. This means, that it is also possible to delete old segments after a certain TTL to free up space. Kafka Backup handles this gracefully. After deleting old segments, the partition index needs to be regenerated.
A segment is a Log of a number of records. When a segment reaches a certain threshold, a new segment is created and no more data is written to the old record anymore. As the segment is rotated after a record is added to it, it is usual that a segment gets slightly bigger than the threshold. In the extreme, each message is written to a new segment.
For each segment, two files are created: A record file, that contains a concatenated list of segments, and an index file that consist of a mapping of offsets to positions in the file.
As it is impossible to guarantee exactly once delivery in a distributed system, Kafka Backup supports idempotent writes. I.e., if a message with the same offset arrives it is written to the same position. Kafka Backup assumes that incoming messages with the same offset are exactly the same (this is reasonable, because this is by Kafkas design). Records are first written to the records file, then an entry is appended to the index and only then the consumer offsets are committed (asynchronously). This design guarantees, that committed messages will appear exactly once in the segment.
Each segment is a concatenation of records. A record represents exactly one message in Kafka. Please Note: For performance reasons Kafka records can consist of multiple messages. We do not perform these optimizations to improve simplicity of the file format.
Kafka Backup assumes that the messages consist of the informations as defined in the Message Format of Kafka 0.11.0 (at the time of writing (June 2019) this is the up to date message format). This means that each record consists of following information:
- Offset
- Key
- Value
- Timestamp and type (no timestamp, create timestamp or log append time)
- Record Headers
Additionally to the topic data, Kafka Backup saves also the consumer offsets for each partition. We store for each Partition a simple JSON file that contains a mapping from consumer groups to the current committed offset of that group. Note, that it is not possible to map the offsets during restoration 1:1. There are many edge cases that may cause that the original message gets a completely different offset after restoration:
- Retention policies
- Log compaction
- Errors during message production
All that errors can appear during production into the original cluster but also during restore operations.
Kafka Backup consists of two Kafka Connect Connectors: A sink connector responsible for the backup task and a source connector responsible for the restoration.
The Kafka Connect Architecture distinguishes between Connectors and Tasks: Tasks perform the actual work and the Connector acts as a preparation and cleanup stage and configures the actual Tasks. For performance reasons, Kafka Connect supports multiple tasks per connector and distributes them across multiple Kafka Connect workers if available.
Currently, Kafka Backup supports only one Task per Connector configuration.
Both, The Sink and also the Backup Connector consist only of boilerplate code. They just pass the configuration to the one task and throw an Exception when the number of tasks is greater than 1.
As usual, our Sink Task extends the Kafka Connect SinkTask
. There
are two jobs, the sink task is responsible for: First, every time, Kafka
Connect delivers new Records to be backed up, the task writes it to
the appropriate partition files. The actual division into Segments is
abstracted away from the Sink Task: It just transforms the Kafka
Connect SinkRecord
format to a Kafka Backup Record
and append
s
it to the appropriate partition.
Second, the Sink Task is also responsible for backing up the consumer
group offsets. Ideally this job would be scheduled independently of
the delivery of new messages from Kafka Connect. Currently the offsets
are synchronized every time, new records are pushed to Kafka
Connect. Note, that the sync of consumer offsets is not supported out
of the box in Kafka Connect. Thus we need to create our own
AdminClient
that is responsible for fetching the offsets for all
consumer groups.
Note, that Kafka Connect supports the definition of topics to back up via Regex and thus, it is possible that new partitions are added during runtime. Kafka Backup supports this case.
In general, the source task is conceptually very similar to the Sink task apart from the directory of the data flow.
Before performing the actual job, the source task collects all required information about the partitions to be backed up and finds the corresponding files on the file system.
The restore task, splits the incoming data in configurable batches and performs the restore for each batch one after another. As there is no way to gracefully shut down Kafka Connect from the inside, the Source Task logs a completion message every few seconds after all data is restored from the files.
To restore consumer offsets, the Source Task requires a new API
introduced by Mirror Maker 2: commitRecord(SourceRecord record, RecordMetadata metadata)
. This function is called for every record
that is written to Kafka. We check whether there is a consumer offset
for the original Kafka Offset. If this is the case, we identify the
offset of the written message using the RecordMetadata
and commit
this offset for the appropriate consumer group.
Kafka Backup requires a directory to write the files to. It creates a directory for each topic using the name of the topic. Each directory contains following files:
File Name | Number of files | Description |
---|---|---|
consumer_offsets_partition_[partition-num] |
One per partition | Offsets for the partition partition-num |
index_partition_[partition-num] |
One per partition | Partition Index |
segment_partition_[partition-num]_from_offset_[start-offset]_records |
Possible many per partition | Record File for the segment |
segment_partition_[partition-num]_from_offset_[start-offset]_index |
Possible many per partition | Index File for the segment |
The partition Index File is a binary log file that concatenates entries without any delimiter.
The file starts with the magic byte 0x01
. If the first byte is not
equal 0x01
then it is not compatible with the current version of
Kafka Backup.
Each entry is of the following form
Length (in bits) | Name | Data Type | Comment |
---|---|---|---|
32 | filenameLength |
int32 |
The length of the file name |
filenameLength |
filename |
UTF8 formatted String | File name of the segment |
64 | startOffset |
int64 |
The offset of the first entry in the segment named filename |
There are two files for each segment. Each of the files is a binary log file that concatenates entries without any delimiter.
Kafka Backup records are very similar to the Kafka Record file format but is missing some optimization techniques to increase simplicity and reliability.
The file starts with the magic byte 0x01
. If the first byte is not
equal 0x01
then it is not compatible with the current version of
Kafka Backup.
Each entry is of the following form:
Length (in bits) | Name | Data Type | Comment |
---|---|---|---|
64 | offset |
int64 |
The offset of the record in the source Kafka cluster |
32 | timestampType |
int32 |
Type of the timestamp: -1 : no timestamp, 0 : CreateTime, 1 : LogAppendTime, -2 : CreateTime but with Timestamp null (dirty workaround regarding https://github.com/itadventurer/kafka-backup/issues/92) |
0 or 64 | timestamp |
optional<int64> |
Timestamp if exists |
32 | keyLength |
int32 |
byte-length of the record key -1 if the key is null |
keyLength |
key |
byte[] |
key (not interpreted in any way) |
32 | valueLength |
int32 |
byte-length of the record value. -1 if the value is null |
valueLength |
value |
byte[] |
value (not interpreted in any way) |
32 | headerSize |
int32 |
number of headers of the record |
calculated | headers |
Header[] |
Concatenated headers of the record |
Each header is of the following form:
Length (in bits) | Name | Data Type | Comment |
---|---|---|---|
32 | headerKeyLength |
int32 |
byte-length of the header key. A key must not be null . (Althought, it can be empty) |
headerKeyLength |
headerKey |
byte[] |
key (not interpreted in any way) |
32 | headerValueLength |
int32 |
byte-length of the header value. -1 if the value is null |
headerValueLength |
headerValue |
byte[] |
value (not interpreted in any way) |
The file starts with the magic byte 0x01
. If the first byte is not
equal 0x01
then it is not compatible with the current version of
Kafka Backup.
Each entry is of the following form
Length (in bits) | Name | Data Type | Comment |
---|---|---|---|
64 | offset |
int64 |
The offset of the record in the source Kafka cluster |
64 | recordFilePosition |
int64 |
The start position in the record file of the record |
64 | recordByteLength |
int64 |
Length of the record in the record file. (recordFilePosition + recordByteLength = recordFilePosition of the new record |
The offset file consists of a mapping from consumer groups to the committed offset in the current partition. It is represented as a simple JSON map where the map key is the consumer group and the value is the offset.
Example:
{
"consumer-group1": 100,
"consumer-group2": 200,
"consumer-group3": 300,
"consumer-group4": 300
}