Skip to content

Commit

Permalink
Merge pull request #99 from WesselVS/implement-point-in-time-backups
Browse files Browse the repository at this point in the history
Implement point in time backups (snapshots)
  • Loading branch information
itadventurer authored Jun 23, 2020
2 parents 85eed4a + dbe6178 commit f157f83
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 18 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN gradle --no-daemon check test shadowJar
# Build Docker Image with Kafka Backup Jar
FROM openjdk:8u212-jre-alpine

ARG kafka_version=2.4.1
ARG kafka_version=2.5.0
ARG scala_version=2.12
ARG glibc_version=2.31-r0

Expand All @@ -30,4 +30,4 @@ RUN apk add --no-cache bash curl \
COPY ./bin /opt/kafka-backup/
COPY --from=builder /opt/kafka-backup/build/libs/kafka-backup.jar /opt/kafka-backup/

ENV PATH="${KAFKA_HOME}/bin:/opt/kafka-backup/:${PATH}"
ENV PATH="${KAFKA_HOME}/bin:/opt/kafka-backup/:${PATH}"
11 changes: 9 additions & 2 deletions bin/backup-standalone.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trap _term INT
##################################### Parse arguments

OPTIONS="h"
LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug
LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug,snapshot

HELP=$(
cat <<END
Expand All @@ -36,6 +36,7 @@ HELP=$(
passed to Admin Client. Only useful if you have additional connection options
--help Prints this message
--debug Print Debug information (if using the environment variable, set it to 'y')
--snapshot One-off backup mode
You can also set all parameters using environment variables. Use CAPITAL LETTERS and underscores (_) instead of dashes (-).
E.g. BOOTSTRAP_SERVER instead of --bootstrap-server
Expand All @@ -51,6 +52,7 @@ END
[ -z ${MAX_SEGMENT_SIZE+x} ] && MAX_SEGMENT_SIZE="$((1 * 1024 * 1024 * 1024))" # 1GiB
[ -z ${COMMAND_CONFIG+x} ] && COMMAND_CONFIG=""
[ -z ${DEBUG+x} ] && DEBUG="n"
[ -z ${SNAPSHOT+x} ] && SNAPSHOT="false"

PLUGIN_PATH="$(dirname "${BASH_SOURCE[0]}")"
CONNECT_BIN=""
Expand Down Expand Up @@ -101,6 +103,10 @@ while true; do
DEBUG=y
shift
;;
-s | --snapshot)
SNAPSHOT=true
shift
;;
--)
shift
break
Expand Down Expand Up @@ -195,6 +201,7 @@ header.converter=org.apache.kafka.connect.converters.ByteArrayConverter
target.dir=$TARGET_DIR
max.segment.size.bytes=$MAX_SEGMENT_SIZE
cluster.bootstrap.servers=$BOOTSTRAP_SERVER
snapshot=$SNAPSHOT
EOF

if [ -n "$TOPICS" ]; then
Expand Down Expand Up @@ -247,4 +254,4 @@ fi

##################################### Start Connect Standalone

KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG}" "$CONNECT_BIN" "$WORKER_CONFIG" "$CONNECTOR_CONFIG"
KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:${LOG4J_CONFIG}" "$CONNECT_BIN" "$WORKER_CONFIG" "$CONNECTOR_CONFIG"
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package de.azapps.kafkabackup.common.offset;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.util.*;

public class EndOffsetReader {
private final Map<String, Object> consumerConfig;

public EndOffsetReader(Map<String, Object> consumerConfig) {
this.consumerConfig = consumerConfig;
}

/**
* Obtain end offsets for each given partition
*/
public Map<TopicPartition, Long> getEndOffsets(Collection<TopicPartition> partitions) {
Map<String, Object> serializerConfig = new HashMap<>(consumerConfig);
serializerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
serializerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
try (KafkaConsumer<Byte[], Byte[]> consumer = new KafkaConsumer<>(serializerConfig)) {
consumer.assign(partitions);

Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
List<TopicPartition> toRemove = new ArrayList<>();

for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
if (partitionOffset.getValue() == 0L) {
toRemove.add(partitionOffset.getKey()); // don't store empty offsets
}
}

for (TopicPartition partition : toRemove) {
offsets.remove(partition);
}

return offsets;
}
}
}
12 changes: 10 additions & 2 deletions src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ class BackupSinkConfig extends AbstractConfig {
static final String ADMIN_CLIENT_PREFIX = "admin.";
static final String TARGET_DIR_CONFIG = "target.dir";
static final String MAX_SEGMENT_SIZE = "max.segment.size.bytes";
static final String SNAPSHOT = "snapshot";

static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(TARGET_DIR_CONFIG, ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH, "TargetDir")
.define(MAX_SEGMENT_SIZE, ConfigDef.Type.INT, 1024 ^ 3, // 1 GiB
ConfigDef.Importance.LOW, "Maximum segment size");
ConfigDef.Importance.LOW, "Maximum segment size")
.define(SNAPSHOT, ConfigDef.Type.BOOLEAN, false,
ConfigDef.Importance.LOW, "Creates a snapshot. Terminates connector when end of all partitions has been reached.");

BackupSinkConfig(Map<?, ?> props) {
super(CONFIG_DEF, props);
super(CONFIG_DEF, props, true);
if (!props.containsKey(TARGET_DIR_CONFIG)) {
throw new RuntimeException("Missing Configuration Variable: " + TARGET_DIR_CONFIG);
}
Expand All @@ -44,5 +47,10 @@ Integer maxSegmentSizeBytes() {
return getInt(MAX_SEGMENT_SIZE);
}

Boolean snapShotMode() { return getBoolean(SNAPSHOT); }

Map<String, Object> consumerConfig() {
return new HashMap<>(originalsWithPrefix(CLUSTER_PREFIX));
}

}
83 changes: 76 additions & 7 deletions src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.azapps.kafkabackup.sink;

import de.azapps.kafkabackup.common.offset.EndOffsetReader;
import de.azapps.kafkabackup.common.offset.OffsetSink;
import de.azapps.kafkabackup.common.partition.PartitionIndex;
import de.azapps.kafkabackup.common.partition.PartitionWriter;
Expand All @@ -11,23 +12,27 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.*;

public class BackupSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(BackupSinkTask.class);
private Path targetDir;
private Map<TopicPartition, PartitionWriter> partitionWriters = new HashMap<>();
private long maxSegmentSizeBytes;
private OffsetSink offsetSink;
private BackupSinkConfig config;
private Map<TopicPartition, Long> endOffsets;
private Map<TopicPartition, Long> currentOffsets = new HashMap<>();
private EndOffsetReader endOffsetReader;
private java.util.function.Consumer<Integer> exitFunction;

@Override
public String version() {
Expand All @@ -36,11 +41,17 @@ public String version() {

@Override
public void start(Map<String, String> props) {
start(props, null);
start(props, null, null, null);
}

public void start(Map<String, String> props, OffsetSink overrideOffsetSink) {
BackupSinkConfig config = new BackupSinkConfig(props);
public void start(
Map<String, String> props,
OffsetSink overrideOffsetSink,
EndOffsetReader overrideEndOffsetReader,
java.util.function.Consumer<Integer> overrideExitFunction
) {
this.config = new BackupSinkConfig(props);

try {
maxSegmentSizeBytes = config.maxSegmentSizeBytes();
targetDir = Paths.get(config.targetDir());
Expand All @@ -53,12 +64,45 @@ public void start(Map<String, String> props, OffsetSink overrideOffsetSink) {
AdminClient adminClient = AdminClient.create(config.adminConfig());
offsetSink = new OffsetSink(adminClient, targetDir);
}

if (overrideEndOffsetReader != null) {
this.endOffsetReader = overrideEndOffsetReader;
} else {
endOffsetReader = new EndOffsetReader(config.consumerConfig());
}

if (overrideExitFunction != null) {
this.exitFunction = overrideExitFunction;
} else {
this.exitFunction = System::exit;
}

log.debug("Initialized BackupSinkTask with target dir {}", targetDir);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Check for end-offsets. Terminate if all offsets >= end-offsets
*/
private void terminateIfCompleted() {
boolean terminate = true;
for (Map.Entry<TopicPartition, Long> partitionOffset : endOffsets.entrySet()) {
Long endOffset = partitionOffset.getValue();
Long currentOffset = currentOffsets.getOrDefault(partitionOffset.getKey(), -1L);

if (currentOffset < endOffset - 1) {
return;
}
}
if (terminate) {
log.debug("Snapshot complete. Terminating kafka connect.");
stop(); // seems that this is not called when using System.exit()
exitFunction.accept(0);
}
}

@Override
public void put(Collection<SinkRecord> records) {
try {
Expand All @@ -69,15 +113,25 @@ public void put(Collection<SinkRecord> records) {
if (sinkRecord.kafkaOffset() % 100 == 0) {
log.debug("Backed up Topic {}, Partition {}, up to offset {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset());
}
if (config.snapShotMode()) {
currentOffsets.put(topicPartition, sinkRecord.kafkaOffset());
}
}

// Todo: refactor to own worker. E.g. using the scheduler of MM2
offsetSink.syncConsumerGroups();
offsetSink.syncOffsets();

if (config.snapShotMode()) {
terminateIfCompleted();
}
} catch (IOException | SegmentIndex.IndexException | PartitionIndex.IndexException | SegmentWriter.SegmentException e) {
throw new RuntimeException(e);
}
}



public void open(Collection<TopicPartition> partitions) {
super.open(partitions);
try {
Expand All @@ -93,6 +147,7 @@ public void open(Collection<TopicPartition> partitions) {
// were written to disk. To protect against this, even if we
// just want to start at offset 0 or reset to the earliest offset, we specify that
// explicitly to forcibly override any committed offsets.

if (lastWrittenOffset > 0) {
context.offset(topicPartition, lastWrittenOffset + 1);
log.debug("Initialized Topic {}, Partition {}. Last written offset: {}"
Expand All @@ -101,10 +156,22 @@ public void open(Collection<TopicPartition> partitions) {
// The offset was not found, so rather than forcibly set the offset to 0 we let the
// consumer decide where to start based upon standard consumer offsets (if available)
// or the consumer's `auto.offset.reset` configuration

// if we are in snapshot mode, then just start at zero.
if (config.snapShotMode()) {
context.offset(topicPartition, 0);
}

log.info("Resetting offset for {} based upon existing consumer group offsets or, if "
+ "there are none, the consumer's 'auto.offset.reset' value.", topicPartition);
}

this.partitionWriters.put(topicPartition, partitionWriter);
this.currentOffsets.put(topicPartition, lastWrittenOffset);
}
if ( config.snapShotMode() ) {
this.endOffsets = endOffsetReader.getEndOffsets(partitions);
this.terminateIfCompleted();
}
if (partitions.isEmpty()) {
log.info("No partitions assigned to BackupSinkTask");
Expand All @@ -119,7 +186,9 @@ public void close(Collection<TopicPartition> partitions) {
try {
for (TopicPartition topicPartition : partitions) {
PartitionWriter partitionWriter = partitionWriters.get(topicPartition);
partitionWriter.close();
if (partitionWriter != null) {
partitionWriter.close();
}
partitionWriters.remove(topicPartition);
log.debug("Closed BackupSinkTask for Topic {}, Partition {}"
, topicPartition.topic(), topicPartition.partition());
Expand Down
Loading

0 comments on commit f157f83

Please sign in to comment.