diff --git a/Dockerfile b/Dockerfile index 96c0039..2a665c9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN gradle --no-daemon check test shadowJar # Build Docker Image with Kafka Backup Jar FROM openjdk:8u212-jre-alpine -ARG kafka_version=2.4.1 +ARG kafka_version=2.5.0 ARG scala_version=2.12 ARG glibc_version=2.31-r0 @@ -30,4 +30,4 @@ RUN apk add --no-cache bash curl \ COPY ./bin /opt/kafka-backup/ COPY --from=builder /opt/kafka-backup/build/libs/kafka-backup.jar /opt/kafka-backup/ -ENV PATH="${KAFKA_HOME}/bin:/opt/kafka-backup/:${PATH}" \ No newline at end of file +ENV PATH="${KAFKA_HOME}/bin:/opt/kafka-backup/:${PATH}" diff --git a/bin/backup-standalone.sh b/bin/backup-standalone.sh index 3bed9d2..ca62f4d 100755 --- a/bin/backup-standalone.sh +++ b/bin/backup-standalone.sh @@ -23,7 +23,7 @@ trap _term INT ##################################### Parse arguments OPTIONS="h" -LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug +LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug,snapshot HELP=$( cat < consumerConfig; + + public EndOffsetReader(Map consumerConfig) { + this.consumerConfig = consumerConfig; + } + + /** + * Obtain end offsets for each given partition + */ + public Map getEndOffsets(Collection partitions) { + Map serializerConfig = new HashMap<>(consumerConfig); + serializerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName()); + serializerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName()); + try (KafkaConsumer consumer = new KafkaConsumer<>(serializerConfig)) { + consumer.assign(partitions); + + Map offsets = consumer.endOffsets(partitions); + List toRemove = new ArrayList<>(); + + for (Map.Entry partitionOffset : offsets.entrySet()) { + if (partitionOffset.getValue() == 0L) { + toRemove.add(partitionOffset.getKey()); // don't store empty offsets + } + } + + for (TopicPartition partition : toRemove) { + offsets.remove(partition); + } + + return offsets; + } + } +} diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java index ab5ed5e..13583d9 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java @@ -12,15 +12,18 @@ class BackupSinkConfig extends AbstractConfig { static final String ADMIN_CLIENT_PREFIX = "admin."; static final String TARGET_DIR_CONFIG = "target.dir"; static final String MAX_SEGMENT_SIZE = "max.segment.size.bytes"; + static final String SNAPSHOT = "snapshot"; static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TARGET_DIR_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "TargetDir") .define(MAX_SEGMENT_SIZE, ConfigDef.Type.INT, 1024 ^ 3, // 1 GiB - ConfigDef.Importance.LOW, "Maximum segment size"); + ConfigDef.Importance.LOW, "Maximum segment size") + .define(SNAPSHOT, ConfigDef.Type.BOOLEAN, false, + ConfigDef.Importance.LOW, "Creates a snapshot. Terminates connector when end of all partitions has been reached."); BackupSinkConfig(Map props) { - super(CONFIG_DEF, props); + super(CONFIG_DEF, props, true); if (!props.containsKey(TARGET_DIR_CONFIG)) { throw new RuntimeException("Missing Configuration Variable: " + TARGET_DIR_CONFIG); } @@ -44,5 +47,10 @@ Integer maxSegmentSizeBytes() { return getInt(MAX_SEGMENT_SIZE); } + Boolean snapShotMode() { return getBoolean(SNAPSHOT); } + + Map consumerConfig() { + return new HashMap<>(originalsWithPrefix(CLUSTER_PREFIX)); + } } diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java index d78d6aa..d0034fb 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java @@ -1,5 +1,6 @@ package de.azapps.kafkabackup.sink; +import de.azapps.kafkabackup.common.offset.EndOffsetReader; import de.azapps.kafkabackup.common.offset.OffsetSink; import de.azapps.kafkabackup.common.partition.PartitionIndex; import de.azapps.kafkabackup.common.partition.PartitionWriter; @@ -11,6 +12,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,9 +20,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; public class BackupSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(BackupSinkTask.class); @@ -28,6 +28,11 @@ public class BackupSinkTask extends SinkTask { private Map partitionWriters = new HashMap<>(); private long maxSegmentSizeBytes; private OffsetSink offsetSink; + private BackupSinkConfig config; + private Map endOffsets; + private Map currentOffsets = new HashMap<>(); + private EndOffsetReader endOffsetReader; + private java.util.function.Consumer exitFunction; @Override public String version() { @@ -36,11 +41,17 @@ public String version() { @Override public void start(Map props) { - start(props, null); + start(props, null, null, null); } - public void start(Map props, OffsetSink overrideOffsetSink) { - BackupSinkConfig config = new BackupSinkConfig(props); + public void start( + Map props, + OffsetSink overrideOffsetSink, + EndOffsetReader overrideEndOffsetReader, + java.util.function.Consumer overrideExitFunction + ) { + this.config = new BackupSinkConfig(props); + try { maxSegmentSizeBytes = config.maxSegmentSizeBytes(); targetDir = Paths.get(config.targetDir()); @@ -53,12 +64,45 @@ public void start(Map props, OffsetSink overrideOffsetSink) { AdminClient adminClient = AdminClient.create(config.adminConfig()); offsetSink = new OffsetSink(adminClient, targetDir); } + + if (overrideEndOffsetReader != null) { + this.endOffsetReader = overrideEndOffsetReader; + } else { + endOffsetReader = new EndOffsetReader(config.consumerConfig()); + } + + if (overrideExitFunction != null) { + this.exitFunction = overrideExitFunction; + } else { + this.exitFunction = System::exit; + } + log.debug("Initialized BackupSinkTask with target dir {}", targetDir); } catch (IOException e) { throw new RuntimeException(e); } } + /** + * Check for end-offsets. Terminate if all offsets >= end-offsets + */ + private void terminateIfCompleted() { + boolean terminate = true; + for (Map.Entry partitionOffset : endOffsets.entrySet()) { + Long endOffset = partitionOffset.getValue(); + Long currentOffset = currentOffsets.getOrDefault(partitionOffset.getKey(), -1L); + + if (currentOffset < endOffset - 1) { + return; + } + } + if (terminate) { + log.debug("Snapshot complete. Terminating kafka connect."); + stop(); // seems that this is not called when using System.exit() + exitFunction.accept(0); + } + } + @Override public void put(Collection records) { try { @@ -69,15 +113,25 @@ public void put(Collection records) { if (sinkRecord.kafkaOffset() % 100 == 0) { log.debug("Backed up Topic {}, Partition {}, up to offset {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()); } + if (config.snapShotMode()) { + currentOffsets.put(topicPartition, sinkRecord.kafkaOffset()); + } } + // Todo: refactor to own worker. E.g. using the scheduler of MM2 offsetSink.syncConsumerGroups(); offsetSink.syncOffsets(); + + if (config.snapShotMode()) { + terminateIfCompleted(); + } } catch (IOException | SegmentIndex.IndexException | PartitionIndex.IndexException | SegmentWriter.SegmentException e) { throw new RuntimeException(e); } } + + public void open(Collection partitions) { super.open(partitions); try { @@ -93,6 +147,7 @@ public void open(Collection partitions) { // were written to disk. To protect against this, even if we // just want to start at offset 0 or reset to the earliest offset, we specify that // explicitly to forcibly override any committed offsets. + if (lastWrittenOffset > 0) { context.offset(topicPartition, lastWrittenOffset + 1); log.debug("Initialized Topic {}, Partition {}. Last written offset: {}" @@ -101,10 +156,22 @@ public void open(Collection partitions) { // The offset was not found, so rather than forcibly set the offset to 0 we let the // consumer decide where to start based upon standard consumer offsets (if available) // or the consumer's `auto.offset.reset` configuration + + // if we are in snapshot mode, then just start at zero. + if (config.snapShotMode()) { + context.offset(topicPartition, 0); + } + log.info("Resetting offset for {} based upon existing consumer group offsets or, if " + "there are none, the consumer's 'auto.offset.reset' value.", topicPartition); } + this.partitionWriters.put(topicPartition, partitionWriter); + this.currentOffsets.put(topicPartition, lastWrittenOffset); + } + if ( config.snapShotMode() ) { + this.endOffsets = endOffsetReader.getEndOffsets(partitions); + this.terminateIfCompleted(); } if (partitions.isEmpty()) { log.info("No partitions assigned to BackupSinkTask"); @@ -119,7 +186,9 @@ public void close(Collection partitions) { try { for (TopicPartition topicPartition : partitions) { PartitionWriter partitionWriter = partitionWriters.get(topicPartition); - partitionWriter.close(); + if (partitionWriter != null) { + partitionWriter.close(); + } partitionWriters.remove(topicPartition); log.debug("Closed BackupSinkTask for Topic {}, Partition {}" , topicPartition.topic(), topicPartition.partition()); diff --git a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java index f47b985..9c4982a 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java +++ b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.*; @@ -60,7 +61,7 @@ public void simpleTest() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); @@ -80,6 +81,54 @@ public void simpleTest() throws Exception { assertEquals(allRecords, partitionReader.readFully()); } + @Test + public void simpleTestWithSnapshotMode() throws Exception { + // Prepare + Path directory = Paths.get(TEMP_DIR.toString(), "simpleTestWithSnapshotMode"); + Files.createDirectories(directory); + Map props = new HashMap<>(DEFAULT_PROPS); + props.put(BackupSinkConfig.TARGET_DIR_CONFIG, directory.toString()); + props.put(BackupSinkConfig.SNAPSHOT, "true"); + + List records = new ArrayList<>(); + Map endOffsets = new HashMap<>(); + endOffsets.put(new TopicPartition(TOPIC1, 0), 13L); + + records.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 0)); + records.add(new Record(TOPIC1, 0, null, null, 1)); + records.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 10)); + + List partitions = new ArrayList<>(); + partitions.add(new TopicPartition(TOPIC1, 0)); + + AtomicBoolean endConditionCheck = new AtomicBoolean(); + // Start Task + BackupSinkTask task = new BackupSinkTask(); + task.initialize(new MockSinkTaskContext()); + task.start(props, new MockOffsetSink(null, null), new MockEndOffsetReader(endOffsets), (n) -> endConditionCheck.set(true)); + + task.open(partitions); + task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); + + assertFalse(endConditionCheck.get()); + + // Write again + List records2 = new ArrayList<>(); + records2.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 11)); + records2.add(new Record(TOPIC1, 0, null, null, 12)); + records2.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 13)); + task.put(records2.stream().map(Record::toSinkRecord).collect(Collectors.toList())); + + task.close(partitions); + + // Check backed up data + List allRecords = new ArrayList<>(records); + allRecords.addAll(records2); + PartitionReader partitionReader = new PartitionReader(TOPIC1, 0, Paths.get(directory.toString(), TOPIC1)); + assertEquals(allRecords, partitionReader.readFully()); + assertTrue(endConditionCheck.get()); + } + @Test public void multipleTopicPartitionsTest() throws Exception { // Prepare @@ -109,7 +158,7 @@ public void multipleTopicPartitionsTest() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); task.close(partitions); @@ -173,7 +222,7 @@ public void invalidOffsetsTest() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); assertThrows(RuntimeException.class, () -> task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList()))); assertDoesNotThrow(() -> task.put(records2.stream().map(Record::toSinkRecord).collect(Collectors.toList()))); @@ -220,7 +269,7 @@ public void writeToExistingData() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); task.initialize(new MockSinkTaskContext()); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); task.close(partitions); diff --git a/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java b/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java new file mode 100644 index 0000000..a1611da --- /dev/null +++ b/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java @@ -0,0 +1,20 @@ +package de.azapps.kafkabackup.sink; + +import de.azapps.kafkabackup.common.offset.EndOffsetReader; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class MockEndOffsetReader extends EndOffsetReader { + private Map offsets; + public MockEndOffsetReader(Map offsets) { + super(new HashMap<>()); + this.offsets = offsets; + } + @Override + public Map getEndOffsets(Collection partitions) { + return offsets; + } +} diff --git a/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java b/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java index a13da04..72f8578 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java +++ b/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java @@ -3,6 +3,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkTaskContext; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -10,7 +11,7 @@ public class MockSinkTaskContext implements SinkTaskContext { @Override public Map configs() { - return null; + return new HashMap<>(); } @Override