Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/gradle-8
Browse files Browse the repository at this point in the history
# Conflicts:
#	build.gradle.kts
  • Loading branch information
philipp94831 committed Feb 21, 2024
2 parents 1437753 + 40023cf commit afb134f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
jobs:
build-and-publish:
name: Java Gradle Docker
uses: bakdata/ci-templates/.github/workflows/[email protected].5
uses: bakdata/ci-templates/.github/workflows/[email protected].6
with:
java-version: 17
docker-publisher: "bakdata"
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

- no changes!

- no changes!

- no changes!


## [1.0.5](https://github.com/bakdata/kafka-connect-resetter/tree/1.0.5) (2023-04-19)
[Full Changelog](https://github.com/bakdata/kafka-connect-resetter/compare/1.0.4...1.0.5)
Expand Down
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ description = "An application to reset the state of Kafka Connect connectors"
plugins {
`java-library`
id("net.researchgate.release") version "3.0.2"
id("com.bakdata.sonar") version "1.1.11"
id("com.bakdata.sonatype") version "1.1.11"
id("com.bakdata.sonar") version "1.1.14"
id("com.bakdata.sonatype") version "1.1.14"
id("org.hildan.github.changelog") version "2.2.0"
id("com.google.cloud.tools.jib") version "3.4.0"
id("io.freefair.lombok") version "8.4"
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=1.0.9-SNAPSHOT
version=1.0.11-SNAPSHOT
org.gradle.caching=true
org.gradle.parallel=true
kafkaVersion=3.5.2
18 changes: 14 additions & 4 deletions src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
* }</pre>
*
* Kafka Connect stores offsets for source connectors in a dedicated topic. The key of such an offset consists of the
* connector name and a connector specific partition name, e.g., {@code ["connector-name", { some-source-specific
* -data... }] }. This tool finds all partitions belonging to the connector that should be reset and deletes the
* corresponding offsets.
* connector name and a connector specific partition name, e.g.,
* {@code ["connector-name", { some-source-specific -data... }] }. This tool finds all partitions belonging to the
* connector that should be reset and deletes the corresponding offsets.
*/

@Slf4j
Expand Down Expand Up @@ -135,6 +135,7 @@ public void run() {
log.info("Finished resetting {}", this.sharedOptions.getConnectorName());
}


private void resetPartitions(final Iterable<byte[]> partitions, final Map<String, Object> kafkaConfig) {
try (final Producer<byte[], byte[]> producer = createProducer(kafkaConfig)) {
producer.initTransactions();
Expand Down Expand Up @@ -179,7 +180,7 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
final Deserializer<byte[]> byteArrayDeserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer =
new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer);
final List<PartitionInfo> partitions = consumer.partitionsFor(this.offsetTopic);
final List<PartitionInfo> partitions = this.partitionsForOffsetTopic(consumer);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(KafkaConnectorSourceResetter::toTopicPartition)
.collect(Collectors.toList());
Expand All @@ -188,4 +189,13 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
return consumer;
}

private <K, V> List<PartitionInfo> partitionsForOffsetTopic(final Consumer<K, V> consumer) {
final Map<String, List<PartitionInfo>> topicsWithPartition = consumer.listTopics();
if (!topicsWithPartition.containsKey(this.offsetTopic)) {
final String message = String.format("Topic '%s' does not exist.", this.offsetTopic);
throw new IllegalArgumentException(message);
}
return topicsWithPartition.get(this.offsetTopic);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,38 @@ void test() throws InterruptedException {
this.softly.assertThat(valuesAfterReset).hasSize(6);
}

@Test
void shouldExitOneWhenOffsetTopicIsSetIncorrectly() throws InterruptedException {
this.createConnectCluster();
delay(10, TimeUnit.SECONDS);
final List<String> values = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build());
this.softly.assertThat(values)
.hasSize(3);
this.connectCluster.stop();
delay(10, TimeUnit.SECONDS);
final KafkaConnectResetterApplication app = new KafkaConnectResetterApplication();

final CommandLine commandLine = getCLI(app);
final int exitCode = commandLine.execute("source",
CONNECTOR_NAME,
"--brokers", this.kafkaCluster.getBrokerList(),
"--offset-topic", "wrong-offset-topic"
);
this.softly.assertThat(exitCode)
.isEqualTo(1);
this.createConnectCluster();
delay(10, TimeUnit.SECONDS);
final List<String> valuesAfterReset = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build());
this.softly.assertThat(valuesAfterReset)
.hasSize(3);
}

private void createConnectCluster() {
this.connectCluster = new EmbeddedConnect(EmbeddedConnectConfig.kafkaConnect()
.with(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, OFFSETS)
Expand Down

0 comments on commit afb134f

Please sign in to comment.