From 586bd80cd6ca22f9d043c02eae7fc9fdfab2435b Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 09:03:25 +0100 Subject: [PATCH 1/7] Validate offset topic --- .../bakdata/kafka/KafkaConnectorSourceResetter.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index 6d09874..25b1dbe 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -24,6 +24,7 @@ package com.bakdata.kafka; +import jakarta.validation.constraints.NotBlank; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; @@ -124,6 +125,9 @@ private static Converter createConverter(final Map kafkaConfig) @Override public void run() { + if(this.offsetTopic.isBlank()) { + throw new IllegalArgumentException("--offset-topic should be set and cannot be blank."); + } final String id = this.createId(); final Map kafkaConfig = this.sharedOptions.createKafkaConfig(); kafkaConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, id); @@ -180,6 +184,12 @@ private Consumer createConsumer(final Map kafkaC final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); final List partitions = consumer.partitionsFor(this.offsetTopic); + if(partitions.isEmpty()) { + final String message = String.format( + "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set correctly.", + this.offsetTopic); + throw new IllegalStateException(message); + } final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition) .collect(Collectors.toList()); From 7b012d5d9fc3aafd645679b1002450e5b906c244 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 09:04:44 +0100 Subject: [PATCH 2/7] Update files --- .../java/com/bakdata/kafka/KafkaConnectorSourceResetter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index 25b1dbe..d8ef0f8 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -24,7 +24,6 @@ package com.bakdata.kafka; -import jakarta.validation.constraints.NotBlank; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDateTime; From 3f6f9c8886270863d613382c901f996b88984544 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 09:21:17 +0100 Subject: [PATCH 3/7] formatting --- .../kafka/KafkaConnectorSourceResetter.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index d8ef0f8..bf8ece3 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -80,9 +80,9 @@ * } * * Kafka Connect stores offsets for source connectors in a dedicated topic. The key of such an offset consists of the - * connector name and a connector specific partition name, e.g., {@code ["connector-name", { some-source-specific - * -data... }] }. This tool finds all partitions belonging to the connector that should be reset and deletes the - * corresponding offsets. + * connector name and a connector specific partition name, e.g., + * {@code ["connector-name", { some-source-specific -data... }] }. This tool finds all partitions belonging to the + * connector that should be reset and deletes the corresponding offsets. */ @Slf4j @@ -124,7 +124,7 @@ private static Converter createConverter(final Map kafkaConfig) @Override public void run() { - if(this.offsetTopic.isBlank()) { + if (this.offsetTopic.isBlank()) { throw new IllegalArgumentException("--offset-topic should be set and cannot be blank."); } final String id = this.createId(); @@ -183,11 +183,12 @@ private Consumer createConsumer(final Map kafkaC final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); final List partitions = consumer.partitionsFor(this.offsetTopic); - if(partitions.isEmpty()) { + if (partitions.isEmpty()) { final String message = String.format( - "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set correctly.", + "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " + + "correctly.", this.offsetTopic); - throw new IllegalStateException(message); + throw new IllegalArgumentException(message); } final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition) From 3c8d5fc425bc3e93c87879538cc8ea5533a566c4 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 10:22:29 +0100 Subject: [PATCH 4/7] add test --- .../kafka/KafkaConnectorSourceResetter.java | 24 +++++++------- ...onnectorSourceResetterApplicationTest.java | 32 +++++++++++++++++++ 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index bf8ece3..6913811 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -124,9 +124,6 @@ private static Converter createConverter(final Map kafkaConfig) @Override public void run() { - if (this.offsetTopic.isBlank()) { - throw new IllegalArgumentException("--offset-topic should be set and cannot be blank."); - } final String id = this.createId(); final Map kafkaConfig = this.sharedOptions.createKafkaConfig(); kafkaConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, id); @@ -182,14 +179,7 @@ private Consumer createConsumer(final Map kafkaC final Deserializer byteArrayDeserializer = new ByteArrayDeserializer(); final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); - final List partitions = consumer.partitionsFor(this.offsetTopic); - if (partitions.isEmpty()) { - final String message = String.format( - "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " - + "correctly.", - this.offsetTopic); - throw new IllegalArgumentException(message); - } + final List partitions = partitionsFor(consumer, this.offsetTopic); final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition) .collect(Collectors.toList()); @@ -198,4 +188,16 @@ private Consumer createConsumer(final Map kafkaC return consumer; } + private static List partitionsFor(final Consumer consumer, final String topic) { + final Map> topicsWithPartition = consumer.listTopics(); + if (!topicsWithPartition.containsKey(topic)) { + final String message = String.format( + "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " + + "correctly.", + topic); + throw new IllegalArgumentException(message); + } + return topicsWithPartition.get(topic); + } + } diff --git a/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java b/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java index d02f8ad..677e3d6 100644 --- a/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java +++ b/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java @@ -123,6 +123,38 @@ void test() throws InterruptedException { this.softly.assertThat(valuesAfterReset).hasSize(6); } + @Test + void shouldExitOneWhenOffsetTopicIsSetIncorrectly() throws InterruptedException { + this.createConnectCluster(); + delay(10, TimeUnit.SECONDS); + final List values = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .build()); + this.softly.assertThat(values) + .hasSize(3); + this.connectCluster.stop(); + delay(10, TimeUnit.SECONDS); + final KafkaConnectResetterApplication app = new KafkaConnectResetterApplication(); + + final CommandLine commandLine = getCLI(app); + final int exitCode = commandLine.execute("source", + CONNECTOR_NAME, + "--brokers", this.kafkaCluster.getBrokerList(), + "--offset-topic", "wrong-offset-topic" + ); + this.softly.assertThat(exitCode) + .isEqualTo(1); + this.createConnectCluster(); + delay(10, TimeUnit.SECONDS); + final List valuesAfterReset = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .build()); + this.softly.assertThat(valuesAfterReset) + .hasSize(3); + } + private void createConnectCluster() { this.connectCluster = new EmbeddedConnect(EmbeddedConnectConfig.kafkaConnect() .with(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, OFFSETS) From 24a3092e42585c660fd1fa1f0adfbadfb98d2fb1 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 10:23:21 +0100 Subject: [PATCH 5/7] Update files --- .../kafka/KafkaConnectorSourceResetter.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index 6913811..e694bc4 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -122,6 +122,18 @@ private static Converter createConverter(final Map kafkaConfig) return converter; } + private static List partitionsFor(final Consumer consumer, final String topic) { + final Map> topicsWithPartition = consumer.listTopics(); + if (!topicsWithPartition.containsKey(topic)) { + final String message = String.format( + "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " + + "correctly.", + topic); + throw new IllegalArgumentException(message); + } + return topicsWithPartition.get(topic); + } + @Override public void run() { final String id = this.createId(); @@ -188,16 +200,4 @@ private Consumer createConsumer(final Map kafkaC return consumer; } - private static List partitionsFor(final Consumer consumer, final String topic) { - final Map> topicsWithPartition = consumer.listTopics(); - if (!topicsWithPartition.containsKey(topic)) { - final String message = String.format( - "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " - + "correctly.", - topic); - throw new IllegalArgumentException(message); - } - return topicsWithPartition.get(topic); - } - } From 590aec311efb6f100f19cef6e10a63845e7867bf Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 10:39:10 +0100 Subject: [PATCH 6/7] review --- .../kafka/KafkaConnectorSourceResetter.java | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index e694bc4..5b8aeac 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -122,18 +122,6 @@ private static Converter createConverter(final Map kafkaConfig) return converter; } - private static List partitionsFor(final Consumer consumer, final String topic) { - final Map> topicsWithPartition = consumer.listTopics(); - if (!topicsWithPartition.containsKey(topic)) { - final String message = String.format( - "Could not fetch partitions from the offset topic '%s'. Check if the offset topic name is set " - + "correctly.", - topic); - throw new IllegalArgumentException(message); - } - return topicsWithPartition.get(topic); - } - @Override public void run() { final String id = this.createId(); @@ -147,6 +135,7 @@ public void run() { log.info("Finished resetting {}", this.sharedOptions.getConnectorName()); } + private void resetPartitions(final Iterable partitions, final Map kafkaConfig) { try (final Producer producer = createProducer(kafkaConfig)) { producer.initTransactions(); @@ -191,7 +180,7 @@ private Consumer createConsumer(final Map kafkaC final Deserializer byteArrayDeserializer = new ByteArrayDeserializer(); final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); - final List partitions = partitionsFor(consumer, this.offsetTopic); + final List partitions = this.partitionsForOffsetTopic(consumer); final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition) .collect(Collectors.toList()); @@ -200,4 +189,13 @@ private Consumer createConsumer(final Map kafkaC return consumer; } + private List partitionsForOffsetTopic(final Consumer consumer) { + final Map> topicsWithPartition = consumer.listTopics(); + if (!topicsWithPartition.containsKey(this.offsetTopic)) { + final String message = String.format("Topic %s does not exist.", this.offsetTopic); + throw new IllegalArgumentException(message); + } + return topicsWithPartition.get(this.offsetTopic); + } + } From 5e7a9641f0aaffcc40519b3650c5120e830166e9 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 10:41:13 +0100 Subject: [PATCH 7/7] Update files --- .../java/com/bakdata/kafka/KafkaConnectorSourceResetter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index 5b8aeac..30da329 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -192,7 +192,7 @@ private Consumer createConsumer(final Map kafkaC private List partitionsForOffsetTopic(final Consumer consumer) { final Map> topicsWithPartition = consumer.listTopics(); if (!topicsWithPartition.containsKey(this.offsetTopic)) { - final String message = String.format("Topic %s does not exist.", this.offsetTopic); + final String message = String.format("Topic '%s' does not exist.", this.offsetTopic); throw new IllegalArgumentException(message); } return topicsWithPartition.get(this.offsetTopic);