From 86665101407d7084689a4c70fcaa1b4fbe4da8a4 Mon Sep 17 00:00:00 2001 From: Anton Kurako Date: Tue, 19 Dec 2023 00:43:05 +0300 Subject: [PATCH 1/4] [0.9.4-SNAPSHOT] KafkaConnectionImpl topic reset fix --- cassandra/README.md | 4 +-- cockroachdb/README.md | 4 +-- gradle.properties | 2 +- kafka/README.md | 4 +-- .../extensions/kafka/KafkaConnectionImpl.java | 28 +++++++++++++------ mariadb/README.md | 4 +-- mockserver/README.md | 4 +-- mysql/README.md | 4 +-- oracle/README.md | 4 +-- postgres/README.md | 4 +-- redis/README.md | 4 +-- 11 files changed, 38 insertions(+), 28 deletions(-) diff --git a/cassandra/README.md b/cassandra/README.md index 123e8bc..e40ee95 100644 --- a/cassandra/README.md +++ b/cassandra/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-cassandra:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-cassandra:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-cassandra:0.9.2" io.goodforgod testcontainers-extensions-cassandra - 0.9.2 + 0.9.4 test ``` diff --git a/cockroachdb/README.md b/cockroachdb/README.md index c39ccce..8af76d2 100644 --- a/cockroachdb/README.md +++ b/cockroachdb/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-cockroachdb:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-cockroachdb:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-cockroachdb:0.9.2" io.goodforgod testcontainers-extensions-cockroachdb - 0.9.2 + 0.9.4 test ``` diff --git a/gradle.properties b/gradle.properties index fec1618..2554466 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ groupId=io.goodforgod artifactRootId=testcontainers-extensions -artifactVersion=0.9.3-SNAPSHOT +artifactVersion=0.9.4-SNAPSHOT ##### GRADLE ##### diff --git a/kafka/README.md b/kafka/README.md index 9027db5..8cfb763 100644 --- a/kafka/README.md +++ b/kafka/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-kafka:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-kafka:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-kafka:0.9.2" io.goodforgod testcontainers-extensions-kafka - 0.9.2 + 0.9.4 test ``` diff --git a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java index 465a774..ad376b4 100644 --- a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java +++ b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java @@ -508,10 +508,10 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi if (!topicsToCreate.isEmpty()) { logger.trace("Topics {} creating...", topics); var result = admin.createTopics(topicsToCreate); - result.all().get(2, TimeUnit.MINUTES); - logger.info("Required topics {} created", topics); + result.all().get(1, TimeUnit.MINUTES); + logger.info("Topics {} created", topics); } else if (reset && !topicsToReset.isEmpty()) { - logger.trace("Required topics {} already exist, but require reset, resetting...", topicsToReset); + logger.trace("Topics {} already exist, but require reset, resetting...", topicsToReset); admin.deleteTopics(topicsToReset).all().get(1, TimeUnit.MINUTES); logger.debug("Topics {} reset success", topicsToReset); @@ -519,8 +519,18 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi .map(topic -> new NewTopic(topic, Optional.of(1), Optional.empty())) .collect(Collectors.toSet()); + logger.trace("Topics {} reset status check...", topicsToReset); + Awaitility.await().atMost(Duration.ofSeconds(35)) + .pollInterval(Duration.ofMillis(50)) + .until(() -> admin.listTopics().names().get(10, TimeUnit.SECONDS).stream() + .filter(topicsToReset::contains) + .findFirst() + .isEmpty()); + Thread.sleep(55); // check above is not 100% + logger.debug("Topics {} reset status check success", topicsToReset); + logger.trace("Topics {} recreating...", topicsToReset); - Awaitility.await().atMost(Duration.ofSeconds(30)) + Awaitility.await().atMost(Duration.ofSeconds(35)) .pollInterval(Duration.ofMillis(50)) .until(() -> { try { @@ -528,7 +538,7 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi return true; } catch (ExecutionException e) { if (e.getCause() instanceof TopicExistsException) { - return false; + return true; } else { throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e); } @@ -537,18 +547,18 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi } }); - logger.info("Required topics {} recreated", topicsToReset); + logger.info("Topics {} recreated", topicsToReset); } else { - logger.debug("Required topics already exist: {}", topics); + logger.debug("Topics already exist: {}", topics); } } catch (ExecutionException e) { if (e.getCause() instanceof TopicExistsException) { - logger.trace("Required topics already exist exception received: {}", topics); + logger.trace("Topics already exist exception received: {}", topics); } else { throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e); } } catch (TopicExistsException e) { - logger.trace("Required topics already exist exception received: {}", topics); + logger.trace("Topics already exist exception received: {}", topics); } catch (Exception e) { throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e); } diff --git a/mariadb/README.md b/mariadb/README.md index 7590b72..5b75670 100644 --- a/mariadb/README.md +++ b/mariadb/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-mariadb:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-mariadb:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-mariadb:0.9.2" io.goodforgod testcontainers-extensions-mariadb - 0.9.2 + 0.9.4 test ``` diff --git a/mockserver/README.md b/mockserver/README.md index d70edd8..91912fa 100644 --- a/mockserver/README.md +++ b/mockserver/README.md @@ -17,7 +17,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-mockserver:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-mockserver:0.9.4" ``` **Maven** @@ -25,7 +25,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-mockserver:0.9.2" io.goodforgod testcontainers-extensions-mockserver - 0.9.2 + 0.9.4 test ``` diff --git a/mysql/README.md b/mysql/README.md index 453b2be..f30c208 100644 --- a/mysql/README.md +++ b/mysql/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-mysql:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-mysql:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-mysql:0.9.2" io.goodforgod testcontainers-extensions-mysql - 0.9.2 + 0.9.4 test ``` diff --git a/oracle/README.md b/oracle/README.md index 900fb71..5c598ab 100644 --- a/oracle/README.md +++ b/oracle/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-oracle:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-oracle:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-oracle:0.9.2" io.goodforgod testcontainers-extensions-oracle - 0.9.2 + 0.9.4 test ``` diff --git a/postgres/README.md b/postgres/README.md index 3e6fddb..db22a89 100644 --- a/postgres/README.md +++ b/postgres/README.md @@ -18,7 +18,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-postgres:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-postgres:0.9.4" ``` **Maven** @@ -26,7 +26,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-postgres:0.9.2" io.goodforgod testcontainers-extensions-postgres - 0.9.2 + 0.9.4 test ``` diff --git a/redis/README.md b/redis/README.md index 8c0e126..80a071e 100644 --- a/redis/README.md +++ b/redis/README.md @@ -17,7 +17,7 @@ Features: **Gradle** ```groovy -testImplementation "io.goodforgod:testcontainers-extensions-redis:0.9.2" +testImplementation "io.goodforgod:testcontainers-extensions-redis:0.9.4" ``` **Maven** @@ -25,7 +25,7 @@ testImplementation "io.goodforgod:testcontainers-extensions-redis:0.9.2" io.goodforgod testcontainers-extensions-redis - 0.9.2 + 0.9.4 test ``` From cf577ff96be00a61f98fe544a84f7c68e4f3d7a6 Mon Sep 17 00:00:00 2001 From: Anton Kurako Date: Tue, 19 Dec 2023 02:37:54 +0300 Subject: [PATCH 2/4] [0.9.4-SNAPSHOT] Check reinforced --- build.gradle | 2 +- .../extensions/kafka/KafkaConnectionImpl.java | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/build.gradle b/build.gradle index 9aeb01c..0636c9c 100644 --- a/build.gradle +++ b/build.gradle @@ -80,7 +80,7 @@ subprojects { testLogging { events("passed", "skipped", "failed") exceptionFormat("full") - showStandardStreams(false) + showStandardStreams(true) } reports { diff --git a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java index ad376b4..1bdb445 100644 --- a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java +++ b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java @@ -4,6 +4,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; @@ -124,8 +125,7 @@ static final class ConsumerImpl implements Consumer { } catch (Exception e) { return Collections.>emptyMap(); } - }, - result -> new HashSet<>(result.keySet()).containsAll(this.topics)); + }, result -> new HashSet<>(result.keySet()).containsAll(this.topics)); logger.debug("KafkaConsumer topics {} assigned", this.topics); logger.trace("KafkaConsumer topics {} poll starting", this.topics); @@ -361,8 +361,7 @@ void close() { kafkaNetworkProperties.putAll(props.properties()); kafkaNetworkProperties.putAll(properties); return kafkaNetworkProperties; - }) - .orElse(null); + }).orElse(null); return new KafkaConnectionClosableImpl(kafkaProperties, networkProperties); } @@ -529,6 +528,7 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi Thread.sleep(55); // check above is not 100% logger.debug("Topics {} reset status check success", topicsToReset); + final AtomicInteger counter = new AtomicInteger(0); logger.trace("Topics {} recreating...", topicsToReset); Awaitility.await().atMost(Duration.ofSeconds(35)) .pollInterval(Duration.ofMillis(50)) @@ -538,7 +538,7 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi return true; } catch (ExecutionException e) { if (e.getCause() instanceof TopicExistsException) { - return true; + return counter.getAndIncrement() > 2; } else { throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e); } From aeeb9ee4ba803e56fdc5425250dd7cd14cdcfaff Mon Sep 17 00:00:00 2001 From: Anton Kurako Date: Tue, 19 Dec 2023 02:56:07 +0300 Subject: [PATCH 3/4] [0.9.4-SNAPSHOT] Logging updated --- cassandra/src/test/resources/logback.xml | 3 ++- cockroachdb/src/test/resources/logback.xml | 3 ++- jdbc/src/test/resources/logback.xml | 3 ++- .../testcontainers/extensions/kafka/KafkaConnectionImpl.java | 1 + kafka/src/test/resources/logback.xml | 3 ++- mariadb/src/test/resources/logback.xml | 3 ++- mockserver/src/test/resources/logback.xml | 3 ++- mysql/src/test/resources/logback.xml | 3 ++- oracle/src/test/resources/logback.xml | 3 ++- postgres/src/test/resources/logback.xml | 3 ++- redis/src/test/resources/logback.xml | 3 ++- 11 files changed, 21 insertions(+), 10 deletions(-) diff --git a/cassandra/src/test/resources/logback.xml b/cassandra/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/cassandra/src/test/resources/logback.xml +++ b/cassandra/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/cockroachdb/src/test/resources/logback.xml b/cockroachdb/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/cockroachdb/src/test/resources/logback.xml +++ b/cockroachdb/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/jdbc/src/test/resources/logback.xml b/jdbc/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/jdbc/src/test/resources/logback.xml +++ b/jdbc/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java index 1bdb445..43e9eb1 100644 --- a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java +++ b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java @@ -432,6 +432,7 @@ public void send(@NotNull String topic, @NotNull List events) { try { return admin.describeTopics(topics).allTopicNames().get(10, TimeUnit.SECONDS); } catch (Exception e) { + logger.warn(e.getMessage()); return Collections.emptyMap(); } }, result -> result.values().stream().map(TopicDescription::name).collect(Collectors.toSet()) diff --git a/kafka/src/test/resources/logback.xml b/kafka/src/test/resources/logback.xml index 3de0fe0..b7e0aa9 100644 --- a/kafka/src/test/resources/logback.xml +++ b/kafka/src/test/resources/logback.xml @@ -15,6 +15,7 @@ - + + diff --git a/mariadb/src/test/resources/logback.xml b/mariadb/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/mariadb/src/test/resources/logback.xml +++ b/mariadb/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/mockserver/src/test/resources/logback.xml b/mockserver/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/mockserver/src/test/resources/logback.xml +++ b/mockserver/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/mysql/src/test/resources/logback.xml b/mysql/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/mysql/src/test/resources/logback.xml +++ b/mysql/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/oracle/src/test/resources/logback.xml b/oracle/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/oracle/src/test/resources/logback.xml +++ b/oracle/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/postgres/src/test/resources/logback.xml b/postgres/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/postgres/src/test/resources/logback.xml +++ b/postgres/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + diff --git a/redis/src/test/resources/logback.xml b/redis/src/test/resources/logback.xml index 4585197..572962d 100644 --- a/redis/src/test/resources/logback.xml +++ b/redis/src/test/resources/logback.xml @@ -14,6 +14,7 @@ - + + From 961eedc9482bbd3661b9920ea64e0db2bfcbaf7b Mon Sep 17 00:00:00 2001 From: Anton Kurako Date: Tue, 19 Dec 2023 03:06:47 +0300 Subject: [PATCH 4/4] [0.9.4-SNAPSHOT] Check --- .../extensions/kafka/KafkaConnectionImpl.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java index 43e9eb1..7214a1b 100644 --- a/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java +++ b/kafka/src/main/java/io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.java @@ -4,7 +4,6 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; @@ -520,16 +519,23 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi .collect(Collectors.toSet()); logger.trace("Topics {} reset status check...", topicsToReset); - Awaitility.await().atMost(Duration.ofSeconds(35)) - .pollInterval(Duration.ofMillis(50)) - .until(() -> admin.listTopics().names().get(10, TimeUnit.SECONDS).stream() + Awaitility.await() + .atMost(Duration.ofSeconds(35)) + .pollInterval(Duration.ofMillis(100)) + .until(() -> { + try { + return admin.describeTopics(topics).allTopicNames().get(10, TimeUnit.SECONDS); + } catch (Exception e) { + logger.warn(e.getMessage()); + return Collections.emptyMap(); + } + }, result -> result.values().stream() + .map(TopicDescription::name) .filter(topicsToReset::contains) .findFirst() .isEmpty()); - Thread.sleep(55); // check above is not 100% logger.debug("Topics {} reset status check success", topicsToReset); - final AtomicInteger counter = new AtomicInteger(0); logger.trace("Topics {} recreating...", topicsToReset); Awaitility.await().atMost(Duration.ofSeconds(35)) .pollInterval(Duration.ofMillis(50)) @@ -539,7 +545,8 @@ static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set topi return true; } catch (ExecutionException e) { if (e.getCause() instanceof TopicExistsException) { - return counter.getAndIncrement() > 2; + Thread.sleep(500); + return false; } else { throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e); }