diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index d8ef0f8..bf8ece3 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -80,9 +80,9 @@ * } * * Kafka Connect stores offsets for source connectors in a dedicated topic. The key of such an offset consists of the - * connector name and a connector specific partition name, e.g., {@code ["connector-name", { some-source-specific - * -data... }] }. This tool finds all partitions belonging to the connector that should be reset and deletes the - * corresponding offsets. + * connector name and a connector specific partition name, e.g., + * {@code ["connector-name", { some-source-specific -data... }] }. This tool finds all partitions belonging to the + * connector that should be reset and deletes the corresponding offsets. */ @Slf4j @@ -124,7 +124,7 @@ private static Converter createConverter(final Map kafkaConfig) @Override public void run() { - if(this.offsetTopic.isBlank()) { + if (this.offsetTopic.isBlank()) { throw new IllegalArgumentException("--offset-topic should be set and cannot be blank."); } final String id = this.createId(); @@ -183,11 +183,12 @@ private Consumer createConsumer(final Map kafkaC final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); final List partitions = consumer.partitionsFor(this.offsetTopic); - if(partitions.isEmpty()) { + if (partitions.isEmpty()) { final String message = String.format( - "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set correctly.", + "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " + + "correctly.", this.offsetTopic); - throw new IllegalStateException(message); + throw new IllegalArgumentException(message); } final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition)