Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Feb 16, 2024
1 parent 7b012d5 commit 3f6f9c8
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
* }</pre>
*
* Kafka Connect stores offsets for source connectors in a dedicated topic. The key of such an offset consists of the
* connector name and a connector specific partition name, e.g., {@code ["connector-name", { some-source-specific
* -data... }] }. This tool finds all partitions belonging to the connector that should be reset and deletes the
* corresponding offsets.
* connector name and a connector specific partition name, e.g.,
* {@code ["connector-name", { some-source-specific -data... }] }. This tool finds all partitions belonging to the
* connector that should be reset and deletes the corresponding offsets.
*/

@Slf4j
Expand Down Expand Up @@ -124,7 +124,7 @@ private static Converter createConverter(final Map<String, Object> kafkaConfig)

@Override
public void run() {
if(this.offsetTopic.isBlank()) {
if (this.offsetTopic.isBlank()) {
throw new IllegalArgumentException("--offset-topic should be set and cannot be blank.");
}
final String id = this.createId();
Expand Down Expand Up @@ -183,11 +183,12 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
final Consumer<byte[], byte[]> consumer =
new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer);
final List<PartitionInfo> partitions = consumer.partitionsFor(this.offsetTopic);
if(partitions.isEmpty()) {
if (partitions.isEmpty()) {
final String message = String.format(
"Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set correctly.",
"Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set "
+ "correctly.",
this.offsetTopic);
throw new IllegalStateException(message);
throw new IllegalArgumentException(message);
}
final List<TopicPartition> topicPartitions = partitions.stream()
.map(KafkaConnectorSourceResetter::toTopicPartition)
Expand Down

0 comments on commit 3f6f9c8

Please sign in to comment.