Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Feb 16, 2024
1 parent 24a3092 commit 590aec3
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,6 @@ private static Converter createConverter(final Map<String, Object> kafkaConfig)
return converter;
}

private static <K, V> List<PartitionInfo> partitionsFor(final Consumer<K, V> consumer, final String topic) {
final Map<String, List<PartitionInfo>> topicsWithPartition = consumer.listTopics();
if (!topicsWithPartition.containsKey(topic)) {
final String message = String.format(
"Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set "
+ "correctly.",
topic);
throw new IllegalArgumentException(message);
}
return topicsWithPartition.get(topic);
}

@Override
public void run() {
final String id = this.createId();
Expand All @@ -147,6 +135,7 @@ public void run() {
log.info("Finished resetting {}", this.sharedOptions.getConnectorName());
}


private void resetPartitions(final Iterable<byte[]> partitions, final Map<String, Object> kafkaConfig) {
try (final Producer<byte[], byte[]> producer = createProducer(kafkaConfig)) {
producer.initTransactions();
Expand Down Expand Up @@ -191,7 +180,7 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
final Deserializer<byte[]> byteArrayDeserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer =
new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer);
final List<PartitionInfo> partitions = partitionsFor(consumer, this.offsetTopic);
final List<PartitionInfo> partitions = this.partitionsForOffsetTopic(consumer);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(KafkaConnectorSourceResetter::toTopicPartition)
.collect(Collectors.toList());
Expand All @@ -200,4 +189,13 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
return consumer;
}

private <K, V> List<PartitionInfo> partitionsForOffsetTopic(final Consumer<K, V> consumer) {
final Map<String, List<PartitionInfo>> topicsWithPartition = consumer.listTopics();
if (!topicsWithPartition.containsKey(this.offsetTopic)) {
final String message = String.format("Topic %s does not exist.", this.offsetTopic);
throw new IllegalArgumentException(message);
}
return topicsWithPartition.get(this.offsetTopic);
}

}

0 comments on commit 590aec3

Please sign in to comment.