From a14cc2f2cdb05bf8d1af110d5c810501fd5e0869 Mon Sep 17 00:00:00 2001 From: Wessel van Staal Date: Mon, 22 Jun 2020 13:21:00 +0200 Subject: [PATCH 1/5] Implement point in time backups (snapshots) --- bin/backup-standalone.sh | 11 +- .../common/offset/EndOffsetReader.java | 41 +++++++ .../kafkabackup/sink/BackupSinkConfig.java | 9 +- .../kafkabackup/sink/BackupSinkTask.java | 101 ++++++++++++++++-- .../kafkabackup/sink/BackupSinkTaskTest.java | 56 +++++++++- .../kafkabackup/sink/MockEndOffsetReader.java | 19 ++++ .../kafkabackup/sink/MockSinkTaskContext.java | 3 +- 7 files changed, 223 insertions(+), 17 deletions(-) create mode 100644 src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java create mode 100644 src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java diff --git a/bin/backup-standalone.sh b/bin/backup-standalone.sh index 3bed9d2..c785ece 100755 --- a/bin/backup-standalone.sh +++ b/bin/backup-standalone.sh @@ -23,7 +23,7 @@ trap _term INT ##################################### Parse arguments OPTIONS="h" -LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug +LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug,snapshot-mode HELP=$( cat < getEndOffsets(Collection partitions) { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("key.deserializer", ByteArrayDeserializer.class.getName()); + props.put("value.deserializer", ByteArrayDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.assign(partitions); + Map offsets = consumer.endOffsets(partitions); + List toRemove = new ArrayList<>(); + + for (TopicPartition partition: offsets.keySet()) { + if (offsets.get(partition) == 0L) { + toRemove.add(partition); // don't store empty offsets + } + } + + for (TopicPartition partition: toRemove) { + offsets.remove(partition); + } + consumer.close(); + return offsets; + } +} diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java index ab5ed5e..c954e70 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java @@ -12,15 +12,18 @@ class BackupSinkConfig extends AbstractConfig { static final String ADMIN_CLIENT_PREFIX = "admin."; static final String TARGET_DIR_CONFIG = "target.dir"; static final String MAX_SEGMENT_SIZE = "max.segment.size.bytes"; + static final String SNAPSHOT_MODE = "snapshot.mode"; static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TARGET_DIR_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "TargetDir") .define(MAX_SEGMENT_SIZE, ConfigDef.Type.INT, 1024 ^ 3, // 1 GiB - ConfigDef.Importance.LOW, "Maximum segment size"); + ConfigDef.Importance.LOW, "Maximum segment size") + .define(SNAPSHOT_MODE, ConfigDef.Type.BOOLEAN, false, + ConfigDef.Importance.LOW, "Snapshot mode. Terminates connector when end of all partitions has been reached."); BackupSinkConfig(Map props) { - super(CONFIG_DEF, props); + super(CONFIG_DEF, props, true); if (!props.containsKey(TARGET_DIR_CONFIG)) { throw new RuntimeException("Missing Configuration Variable: " + TARGET_DIR_CONFIG); } @@ -44,5 +47,7 @@ Integer maxSegmentSizeBytes() { return getInt(MAX_SEGMENT_SIZE); } + Boolean snapShotMode() { return getBoolean(SNAPSHOT_MODE); } + } diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java index d78d6aa..9dd8182 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java @@ -1,5 +1,6 @@ package de.azapps.kafkabackup.sink; +import de.azapps.kafkabackup.common.offset.EndOffsetReader; import de.azapps.kafkabackup.common.offset.OffsetSink; import de.azapps.kafkabackup.common.partition.PartitionIndex; import de.azapps.kafkabackup.common.partition.PartitionWriter; @@ -11,6 +12,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; +import org.apache.kafka.connect.sink.SinkTaskContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,9 +20,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; public class BackupSinkTask extends SinkTask { private static final Logger log = LoggerFactory.getLogger(BackupSinkTask.class); @@ -28,6 +28,14 @@ public class BackupSinkTask extends SinkTask { private Map partitionWriters = new HashMap<>(); private long maxSegmentSizeBytes; private OffsetSink offsetSink; + private SinkTaskContext sinkTaskContext; + private BackupSinkConfig config; + private AdminClient adminClient; + private Map endOffsets; + private Map currentOffsets = new HashMap<>(); + private EndOffsetReader endOffsetReader; + private String bootstrapServers; + private java.util.function.Consumer exitFunction; @Override public String version() { @@ -36,11 +44,24 @@ public String version() { @Override public void start(Map props) { - start(props, null); + start(props, null, null, null); } - public void start(Map props, OffsetSink overrideOffsetSink) { - BackupSinkConfig config = new BackupSinkConfig(props); + @Override + public void initialize(SinkTaskContext context) { + super.initialize(context); + this.sinkTaskContext = context; + this.bootstrapServers = context.configs().get(BackupSinkConfig.CLUSTER_BOOTSTRAP_SERVERS); + } + + public void start( + Map props, + OffsetSink overrideOffsetSink, + EndOffsetReader overrideEndOffsetReader, + java.util.function.Consumer overrideExitFunction + ) { + this.config = new BackupSinkConfig(props); + try { maxSegmentSizeBytes = config.maxSegmentSizeBytes(); targetDir = Paths.get(config.targetDir()); @@ -50,15 +71,49 @@ public void start(Map props, OffsetSink overrideOffsetSink) { if(overrideOffsetSink != null) { offsetSink = overrideOffsetSink; } else { - AdminClient adminClient = AdminClient.create(config.adminConfig()); + adminClient = AdminClient.create(config.adminConfig()); offsetSink = new OffsetSink(adminClient, targetDir); } + + if (overrideEndOffsetReader != null) { + this.endOffsetReader = overrideEndOffsetReader; + } else { + endOffsetReader = new EndOffsetReader(bootstrapServers); + } + + if (overrideExitFunction != null) { + this.exitFunction = overrideExitFunction; + } else { + this.exitFunction = System::exit; + } + log.debug("Initialized BackupSinkTask with target dir {}", targetDir); } catch (IOException e) { throw new RuntimeException(e); } } + /** + * Check for end-offsets. Terminate if all offsets >= end-offsets + */ + private void terminateIfCompleted() { + boolean terminate = true; + for (TopicPartition partition : endOffsets.keySet()) { + Long endOffset = endOffsets.get(partition); + Long currentOffset = currentOffsets.getOrDefault(partition, -1L); + + if (currentOffset < endOffset - 1) { + terminate = false; + break; + } + } + if (terminate) { + log.debug("Snapshot complete. Terminating kafka connect."); + stop(); // seems that this is not called when using System.exit() + exitFunction.accept(0); + } + } + @Override public void put(Collection records) { try { @@ -69,15 +124,26 @@ public void put(Collection records) { if (sinkRecord.kafkaOffset() % 100 == 0) { log.debug("Backed up Topic {}, Partition {}, up to offset {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()); } + if (config.snapShotMode()) { + currentOffsets.put(topicPartition, sinkRecord.kafkaOffset()); + log.debug("Update offset for topic {}, partition {}, offset {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()); + } } + // Todo: refactor to own worker. E.g. using the scheduler of MM2 offsetSink.syncConsumerGroups(); offsetSink.syncOffsets(); + + if (config.snapShotMode()) { + terminateIfCompleted(); + } } catch (IOException | SegmentIndex.IndexException | PartitionIndex.IndexException | SegmentWriter.SegmentException e) { throw new RuntimeException(e); } } + + public void open(Collection partitions) { super.open(partitions); try { @@ -93,6 +159,7 @@ public void open(Collection partitions) { // were written to disk. To protect against this, even if we // just want to start at offset 0 or reset to the earliest offset, we specify that // explicitly to forcibly override any committed offsets. + if (lastWrittenOffset > 0) { context.offset(topicPartition, lastWrittenOffset + 1); log.debug("Initialized Topic {}, Partition {}. Last written offset: {}" @@ -101,10 +168,23 @@ public void open(Collection partitions) { // The offset was not found, so rather than forcibly set the offset to 0 we let the // consumer decide where to start based upon standard consumer offsets (if available) // or the consumer's `auto.offset.reset` configuration + + // if we are in snapshot mode, then just start at zero. + if (config.snapShotMode()) { + context.offset(topicPartition, 0); + } + log.info("Resetting offset for {} based upon existing consumer group offsets or, if " + "there are none, the consumer's 'auto.offset.reset' value.", topicPartition); } + this.partitionWriters.put(topicPartition, partitionWriter); + this.currentOffsets.put(topicPartition, lastWrittenOffset); + log.debug("last written offset " + lastWrittenOffset); + } + if ( config.snapShotMode() ) { + this.endOffsets = endOffsetReader.getEndOffsets(partitions); + this.terminateIfCompleted(); } if (partitions.isEmpty()) { log.info("No partitions assigned to BackupSinkTask"); @@ -119,7 +199,9 @@ public void close(Collection partitions) { try { for (TopicPartition topicPartition : partitions) { PartitionWriter partitionWriter = partitionWriters.get(topicPartition); - partitionWriter.close(); + if (partitionWriter != null) { + partitionWriter.close(); + } partitionWriters.remove(topicPartition); log.debug("Closed BackupSinkTask for Topic {}, Partition {}" , topicPartition.topic(), topicPartition.partition()); @@ -135,6 +217,9 @@ public void stop() { for (PartitionWriter partition : partitionWriters.values()) { partition.close(); } + if (adminClient != null) { + adminClient.close(); + } offsetSink.close(); log.info("Stopped BackupSinkTask"); } catch (IOException e) { diff --git a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java index f47b985..8eadef8 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java +++ b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java @@ -19,6 +19,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.*; @@ -60,7 +61,7 @@ public void simpleTest() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); @@ -80,6 +81,53 @@ public void simpleTest() throws Exception { assertEquals(allRecords, partitionReader.readFully()); } + @Test + public void simpleTestWithSnapshotMode() throws Exception { + // Prepare + Path directory = Paths.get(TEMP_DIR.toString(), "simpleTestWithSnapshotMode"); + Files.createDirectories(directory); + Map props = new HashMap<>(DEFAULT_PROPS); + props.put(BackupSinkConfig.TARGET_DIR_CONFIG, directory.toString()); + props.put(BackupSinkConfig.SNAPSHOT_MODE, "true"); + + List records = new ArrayList<>(); + Map endOffsets = new HashMap<>(); + endOffsets.put(new TopicPartition(TOPIC1, 0), 13L); + + records.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 0)); + records.add(new Record(TOPIC1, 0, null, null, 1)); + records.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 10)); + + List partitions = new ArrayList<>(); + partitions.add(new TopicPartition(TOPIC1, 0)); + + AtomicBoolean endConditionCheck = new AtomicBoolean(); + // Start Task + BackupSinkTask task = new BackupSinkTask(); + task.start(props, new MockOffsetSink(null, null), new MockEndOffsetReader("", endOffsets), (n) -> endConditionCheck.set(true)); + + task.open(partitions); + task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); + + assertFalse(endConditionCheck.get()); + + // Write again + List records2 = new ArrayList<>(); + records2.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 11)); + records2.add(new Record(TOPIC1, 0, null, null, 12)); + records2.add(new Record(TOPIC1, 0, KEY_BYTES, VALUE_BYTES, 13)); + task.put(records2.stream().map(Record::toSinkRecord).collect(Collectors.toList())); + + task.close(partitions); + + // Check backed up data + List allRecords = new ArrayList<>(records); + allRecords.addAll(records2); + PartitionReader partitionReader = new PartitionReader(TOPIC1, 0, Paths.get(directory.toString(), TOPIC1)); + assertEquals(allRecords, partitionReader.readFully()); + assertTrue(endConditionCheck.get()); + } + @Test public void multipleTopicPartitionsTest() throws Exception { // Prepare @@ -109,7 +157,7 @@ public void multipleTopicPartitionsTest() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); task.close(partitions); @@ -173,7 +221,7 @@ public void invalidOffsetsTest() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); assertThrows(RuntimeException.class, () -> task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList()))); assertDoesNotThrow(() -> task.put(records2.stream().map(Record::toSinkRecord).collect(Collectors.toList()))); @@ -220,7 +268,7 @@ public void writeToExistingData() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); task.initialize(new MockSinkTaskContext()); - task.start(props, new MockOffsetSink(null, null)); + task.start(props, new MockOffsetSink(null, null), null, (n) -> {}); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); task.close(partitions); diff --git a/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java b/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java new file mode 100644 index 0000000..5ca005d --- /dev/null +++ b/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java @@ -0,0 +1,19 @@ +package de.azapps.kafkabackup.sink; + +import de.azapps.kafkabackup.common.offset.EndOffsetReader; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; +import java.util.Map; + +public class MockEndOffsetReader extends EndOffsetReader { + private Map offsets; + public MockEndOffsetReader(String bootstrapServers, Map offsets) { + super(bootstrapServers); + this.offsets = offsets; + } + @Override + public Map getEndOffsets(Collection partitions) { + return offsets; + } +} diff --git a/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java b/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java index a13da04..72f8578 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java +++ b/src/test/java/de/azapps/kafkabackup/sink/MockSinkTaskContext.java @@ -3,6 +3,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkTaskContext; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -10,7 +11,7 @@ public class MockSinkTaskContext implements SinkTaskContext { @Override public Map configs() { - return null; + return new HashMap<>(); } @Override From 3da6b298e619fd86d41100f78997bb942972c3fa Mon Sep 17 00:00:00 2001 From: Wessel van Staal Date: Mon, 22 Jun 2020 16:20:35 +0200 Subject: [PATCH 2/5] fix test --- src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java index 8eadef8..d896b55 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java +++ b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java @@ -104,6 +104,7 @@ public void simpleTestWithSnapshotMode() throws Exception { AtomicBoolean endConditionCheck = new AtomicBoolean(); // Start Task BackupSinkTask task = new BackupSinkTask(); + task.initialize(new MockSinkTaskContext()); task.start(props, new MockOffsetSink(null, null), new MockEndOffsetReader("", endOffsets), (n) -> endConditionCheck.set(true)); task.open(partitions); From 4911896459189cc0cc0351afdb289b841fbcdd5c Mon Sep 17 00:00:00 2001 From: Wessel van Staal Date: Mon, 22 Jun 2020 16:25:48 +0200 Subject: [PATCH 3/5] fix spotbugs --- .../azapps/kafkabackup/common/offset/EndOffsetReader.java | 6 +++--- .../java/de/azapps/kafkabackup/sink/BackupSinkTask.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java b/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java index 3c898b5..9f7a689 100644 --- a/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java +++ b/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java @@ -26,9 +26,9 @@ public Map getEndOffsets(Collection partit Map offsets = consumer.endOffsets(partitions); List toRemove = new ArrayList<>(); - for (TopicPartition partition: offsets.keySet()) { - if (offsets.get(partition) == 0L) { - toRemove.add(partition); // don't store empty offsets + for (Map.Entry partitionOffset: offsets.entrySet()) { + if (partitionOffset.getValue() == 0L) { + toRemove.add(partitionOffset.getKey()); // don't store empty offsets } } diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java index 9dd8182..d6611d3 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java @@ -98,9 +98,9 @@ public void start( */ private void terminateIfCompleted() { boolean terminate = true; - for (TopicPartition partition : endOffsets.keySet()) { - Long endOffset = endOffsets.get(partition); - Long currentOffset = currentOffsets.getOrDefault(partition, -1L); + for (Map.Entry partitionOffset : endOffsets.entrySet()) { + Long endOffset = partitionOffset.getValue(); + Long currentOffset = currentOffsets.getOrDefault(partitionOffset.getKey(), -1L); if (currentOffset < endOffset - 1) { terminate = false; From 7d1be68dca9503b5590c653101c446788c7d5bd1 Mon Sep 17 00:00:00 2001 From: Wessel van Staal Date: Tue, 23 Jun 2020 06:02:11 +0200 Subject: [PATCH 4/5] review comments --- Dockerfile | 4 ++-- bin/backup-standalone.sh | 12 +++++------ .../common/offset/EndOffsetReader.java | 16 +++++++------- .../kafkabackup/sink/BackupSinkConfig.java | 11 ++++++---- .../kafkabackup/sink/BackupSinkTask.java | 21 +++---------------- .../kafkabackup/sink/BackupSinkTaskTest.java | 4 ++-- .../kafkabackup/sink/MockEndOffsetReader.java | 5 +++-- 7 files changed, 31 insertions(+), 42 deletions(-) diff --git a/Dockerfile b/Dockerfile index 96c0039..2a665c9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,7 @@ RUN gradle --no-daemon check test shadowJar # Build Docker Image with Kafka Backup Jar FROM openjdk:8u212-jre-alpine -ARG kafka_version=2.4.1 +ARG kafka_version=2.5.0 ARG scala_version=2.12 ARG glibc_version=2.31-r0 @@ -30,4 +30,4 @@ RUN apk add --no-cache bash curl \ COPY ./bin /opt/kafka-backup/ COPY --from=builder /opt/kafka-backup/build/libs/kafka-backup.jar /opt/kafka-backup/ -ENV PATH="${KAFKA_HOME}/bin:/opt/kafka-backup/:${PATH}" \ No newline at end of file +ENV PATH="${KAFKA_HOME}/bin:/opt/kafka-backup/:${PATH}" diff --git a/bin/backup-standalone.sh b/bin/backup-standalone.sh index c785ece..ca62f4d 100755 --- a/bin/backup-standalone.sh +++ b/bin/backup-standalone.sh @@ -23,7 +23,7 @@ trap _term INT ##################################### Parse arguments OPTIONS="h" -LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug,snapshot-mode +LONGOPTS=bootstrap-server:,target-dir:,topics:,topics-regex:,max-segment-size:,command-config:,help,debug,snapshot HELP=$( cat < consumerConfig; - public EndOffsetReader(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; + public EndOffsetReader(Map consumerConfig) { + this.consumerConfig = consumerConfig; } /** * Obtain end offsets for each given partition */ public Map getEndOffsets(Collection partitions) { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServers); - props.put("key.deserializer", ByteArrayDeserializer.class.getName()); - props.put("value.deserializer", ByteArrayDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); + Map serializerConfig = new HashMap<>(consumerConfig); + serializerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName()); + serializerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName()); + KafkaConsumer consumer = new KafkaConsumer<>(serializerConfig); consumer.assign(partitions); + Map offsets = consumer.endOffsets(partitions); List toRemove = new ArrayList<>(); diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java index c954e70..13583d9 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkConfig.java @@ -12,15 +12,15 @@ class BackupSinkConfig extends AbstractConfig { static final String ADMIN_CLIENT_PREFIX = "admin."; static final String TARGET_DIR_CONFIG = "target.dir"; static final String MAX_SEGMENT_SIZE = "max.segment.size.bytes"; - static final String SNAPSHOT_MODE = "snapshot.mode"; + static final String SNAPSHOT = "snapshot"; static final ConfigDef CONFIG_DEF = new ConfigDef() .define(TARGET_DIR_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "TargetDir") .define(MAX_SEGMENT_SIZE, ConfigDef.Type.INT, 1024 ^ 3, // 1 GiB ConfigDef.Importance.LOW, "Maximum segment size") - .define(SNAPSHOT_MODE, ConfigDef.Type.BOOLEAN, false, - ConfigDef.Importance.LOW, "Snapshot mode. Terminates connector when end of all partitions has been reached."); + .define(SNAPSHOT, ConfigDef.Type.BOOLEAN, false, + ConfigDef.Importance.LOW, "Creates a snapshot. Terminates connector when end of all partitions has been reached."); BackupSinkConfig(Map props) { super(CONFIG_DEF, props, true); @@ -47,7 +47,10 @@ Integer maxSegmentSizeBytes() { return getInt(MAX_SEGMENT_SIZE); } - Boolean snapShotMode() { return getBoolean(SNAPSHOT_MODE); } + Boolean snapShotMode() { return getBoolean(SNAPSHOT); } + Map consumerConfig() { + return new HashMap<>(originalsWithPrefix(CLUSTER_PREFIX)); + } } diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java index d6611d3..9da0eb4 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java @@ -28,13 +28,10 @@ public class BackupSinkTask extends SinkTask { private Map partitionWriters = new HashMap<>(); private long maxSegmentSizeBytes; private OffsetSink offsetSink; - private SinkTaskContext sinkTaskContext; private BackupSinkConfig config; - private AdminClient adminClient; private Map endOffsets; private Map currentOffsets = new HashMap<>(); private EndOffsetReader endOffsetReader; - private String bootstrapServers; private java.util.function.Consumer exitFunction; @Override @@ -47,13 +44,6 @@ public void start(Map props) { start(props, null, null, null); } - @Override - public void initialize(SinkTaskContext context) { - super.initialize(context); - this.sinkTaskContext = context; - this.bootstrapServers = context.configs().get(BackupSinkConfig.CLUSTER_BOOTSTRAP_SERVERS); - } - public void start( Map props, OffsetSink overrideOffsetSink, @@ -71,14 +61,14 @@ public void start( if(overrideOffsetSink != null) { offsetSink = overrideOffsetSink; } else { - adminClient = AdminClient.create(config.adminConfig()); + AdminClient adminClient = AdminClient.create(config.adminConfig()); offsetSink = new OffsetSink(adminClient, targetDir); } if (overrideEndOffsetReader != null) { this.endOffsetReader = overrideEndOffsetReader; } else { - endOffsetReader = new EndOffsetReader(bootstrapServers); + endOffsetReader = new EndOffsetReader(config.consumerConfig()); } if (overrideExitFunction != null) { @@ -103,8 +93,7 @@ private void terminateIfCompleted() { Long currentOffset = currentOffsets.getOrDefault(partitionOffset.getKey(), -1L); if (currentOffset < endOffset - 1) { - terminate = false; - break; + return; } } if (terminate) { @@ -126,7 +115,6 @@ public void put(Collection records) { } if (config.snapShotMode()) { currentOffsets.put(topicPartition, sinkRecord.kafkaOffset()); - log.debug("Update offset for topic {}, partition {}, offset {}", sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()); } } @@ -217,9 +205,6 @@ public void stop() { for (PartitionWriter partition : partitionWriters.values()) { partition.close(); } - if (adminClient != null) { - adminClient.close(); - } offsetSink.close(); log.info("Stopped BackupSinkTask"); } catch (IOException e) { diff --git a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java index d896b55..9c4982a 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java +++ b/src/test/java/de/azapps/kafkabackup/sink/BackupSinkTaskTest.java @@ -88,7 +88,7 @@ public void simpleTestWithSnapshotMode() throws Exception { Files.createDirectories(directory); Map props = new HashMap<>(DEFAULT_PROPS); props.put(BackupSinkConfig.TARGET_DIR_CONFIG, directory.toString()); - props.put(BackupSinkConfig.SNAPSHOT_MODE, "true"); + props.put(BackupSinkConfig.SNAPSHOT, "true"); List records = new ArrayList<>(); Map endOffsets = new HashMap<>(); @@ -105,7 +105,7 @@ public void simpleTestWithSnapshotMode() throws Exception { // Start Task BackupSinkTask task = new BackupSinkTask(); task.initialize(new MockSinkTaskContext()); - task.start(props, new MockOffsetSink(null, null), new MockEndOffsetReader("", endOffsets), (n) -> endConditionCheck.set(true)); + task.start(props, new MockOffsetSink(null, null), new MockEndOffsetReader(endOffsets), (n) -> endConditionCheck.set(true)); task.open(partitions); task.put(records.stream().map(Record::toSinkRecord).collect(Collectors.toList())); diff --git a/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java b/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java index 5ca005d..a1611da 100644 --- a/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java +++ b/src/test/java/de/azapps/kafkabackup/sink/MockEndOffsetReader.java @@ -4,12 +4,13 @@ import org.apache.kafka.common.TopicPartition; import java.util.Collection; +import java.util.HashMap; import java.util.Map; public class MockEndOffsetReader extends EndOffsetReader { private Map offsets; - public MockEndOffsetReader(String bootstrapServers, Map offsets) { - super(bootstrapServers); + public MockEndOffsetReader(Map offsets) { + super(new HashMap<>()); this.offsets = offsets; } @Override From dbe6178c5be328480f2c16f363a744bfb68882f7 Mon Sep 17 00:00:00 2001 From: Wessel van Staal Date: Tue, 23 Jun 2020 12:18:03 +0200 Subject: [PATCH 5/5] review comments --- .../common/offset/EndOffsetReader.java | 25 ++++++++++--------- .../kafkabackup/sink/BackupSinkTask.java | 1 - 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java b/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java index 88247e6..592c0cb 100644 --- a/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java +++ b/src/main/java/de/azapps/kafkabackup/common/offset/EndOffsetReader.java @@ -20,22 +20,23 @@ public Map getEndOffsets(Collection partit Map serializerConfig = new HashMap<>(consumerConfig); serializerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName()); serializerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(serializerConfig); - consumer.assign(partitions); + try (KafkaConsumer consumer = new KafkaConsumer<>(serializerConfig)) { + consumer.assign(partitions); - Map offsets = consumer.endOffsets(partitions); - List toRemove = new ArrayList<>(); + Map offsets = consumer.endOffsets(partitions); + List toRemove = new ArrayList<>(); - for (Map.Entry partitionOffset: offsets.entrySet()) { - if (partitionOffset.getValue() == 0L) { - toRemove.add(partitionOffset.getKey()); // don't store empty offsets + for (Map.Entry partitionOffset : offsets.entrySet()) { + if (partitionOffset.getValue() == 0L) { + toRemove.add(partitionOffset.getKey()); // don't store empty offsets + } + } + + for (TopicPartition partition : toRemove) { + offsets.remove(partition); } - } - for (TopicPartition partition: toRemove) { - offsets.remove(partition); + return offsets; } - consumer.close(); - return offsets; } } diff --git a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java index 9da0eb4..d0034fb 100644 --- a/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java +++ b/src/main/java/de/azapps/kafkabackup/sink/BackupSinkTask.java @@ -168,7 +168,6 @@ public void open(Collection partitions) { this.partitionWriters.put(topicPartition, partitionWriter); this.currentOffsets.put(topicPartition, lastWrittenOffset); - log.debug("last written offset " + lastWrittenOffset); } if ( config.snapShotMode() ) { this.endOffsets = endOffsetReader.getEndOffsets(partitions);