diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index e694bc4..5b8aeac 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -122,18 +122,6 @@ private static Converter createConverter(final Map kafkaConfig) return converter; } - private static List partitionsFor(final Consumer consumer, final String topic) { - final Map> topicsWithPartition = consumer.listTopics(); - if (!topicsWithPartition.containsKey(topic)) { - final String message = String.format( - "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " - + "correctly.", - topic); - throw new IllegalArgumentException(message); - } - return topicsWithPartition.get(topic); - } - @Override public void run() { final String id = this.createId(); @@ -147,6 +135,7 @@ public void run() { log.info("Finished resetting {}", this.sharedOptions.getConnectorName()); } + private void resetPartitions(final Iterable partitions, final Map kafkaConfig) { try (final Producer producer = createProducer(kafkaConfig)) { producer.initTransactions(); @@ -191,7 +180,7 @@ private Consumer createConsumer(final Map kafkaC final Deserializer byteArrayDeserializer = new ByteArrayDeserializer(); final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); - final List partitions = partitionsFor(consumer, this.offsetTopic); + final List partitions = this.partitionsForOffsetTopic(consumer); final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition) .collect(Collectors.toList()); @@ -200,4 +189,13 @@ private Consumer createConsumer(final Map kafkaC return consumer; } + private List partitionsForOffsetTopic(final Consumer consumer) { + final Map> topicsWithPartition = consumer.listTopics(); + if (!topicsWithPartition.containsKey(this.offsetTopic)) { + final String message = String.format("Topic %s does not exist.", this.offsetTopic); + throw new IllegalArgumentException(message); + } + return topicsWithPartition.get(this.offsetTopic); + } + }