diff --git a/.github/project.yml b/.github/project.yml index 1211f6b..21c245c 100644 --- a/.github/project.yml +++ b/.github/project.yml @@ -1,4 +1,4 @@ release: - current-version: "1.0.1" + current-version: "1.0.2" next-version: "1.0.0-SNAPSHOT" diff --git a/docs/modules/ROOT/pages/includes/attributes.adoc b/docs/modules/ROOT/pages/includes/attributes.adoc index df410fb..4ee4706 100644 --- a/docs/modules/ROOT/pages/includes/attributes.adoc +++ b/docs/modules/ROOT/pages/includes/attributes.adoc @@ -1,3 +1,3 @@ -:project-version: 1.0.1 +:project-version: 1.0.2 :examples-dir: ./../examples/ \ No newline at end of file diff --git a/quarkus-asyncapi-scanner/runtime/src/main/java/io/quarkiverse/asyncapi/annotation/scanner/kafka/binding/KafkaResolver.java b/quarkus-asyncapi-scanner/runtime/src/main/java/io/quarkiverse/asyncapi/annotation/scanner/kafka/binding/KafkaResolver.java index ad449ab..e5823c0 100644 --- a/quarkus-asyncapi-scanner/runtime/src/main/java/io/quarkiverse/asyncapi/annotation/scanner/kafka/binding/KafkaResolver.java +++ b/quarkus-asyncapi-scanner/runtime/src/main/java/io/quarkiverse/asyncapi/annotation/scanner/kafka/binding/KafkaResolver.java @@ -70,29 +70,38 @@ public KafkaChannelBinding getKafkaChannelBindings(String aTopic) { } KafkaChannelTopicConfiguration getTopicConfiguration(AdminClient aClient, String aTopic) { - Map configMap = aClient - .describeConfigs(List.of(new ConfigResource(ConfigResource.Type.TOPIC, aTopic))) - .values().values().stream() - .map(f -> { - try { - return f.get(); - } catch (InterruptedException | ExecutionException interruptedException) { - return null; - } - }) - .filter(Objects::nonNull) - .map(Config::entries) - .flatMap(Collection::stream) - .collect(Collectors.toMap(ConfigEntry::name, Function.identity())); - KafkaChannelTopicCleanupPolicy cleanUpPolicy = KafkaChannelTopicCleanupPolicy - .valueOf(configMap.get(CLEANUP_POLICY).value()); - return KafkaChannelTopicConfiguration.builder() - .cleanupPolicy(List.of(cleanUpPolicy)) - .retentionMs(Integer.valueOf(configMap.get(RETENTION_MS).value())) - .retentionBytes(Integer.valueOf(configMap.get(RETENTION_BYTES).value())) - .deleteRetentionMs(Integer.valueOf(configMap.get(DELETE_RETENTION_MS).value())) - .maxMessageBytes(Integer.valueOf(configMap.get(MAX_MESSAGE_BYTES).value())) - .build(); + KafkaChannelTopicConfiguration.KafkaChannelTopicConfigurationBuilder builder = KafkaChannelTopicConfiguration.builder(); + try { + Map configMap = aClient + .describeConfigs(List.of(new ConfigResource(ConfigResource.Type.TOPIC, aTopic))) + .values().values().stream() + .map(f -> { + try { + return f.get(); + } catch (InterruptedException | ExecutionException interruptedException) { + return null; + } + }) + .filter(Objects::nonNull) + .map(Config::entries) + .flatMap(Collection::stream) + .collect(Collectors.toMap(ConfigEntry::name, Function.identity())); + String cleanUpPolicyString = configMap.get(CLEANUP_POLICY).value(); + List cleanUpPolicies = cleanUpPolicyString == null + ? null + : List.of(KafkaChannelTopicCleanupPolicy.valueOf(cleanUpPolicyString.toUpperCase())); + return builder + .cleanupPolicy(cleanUpPolicies) + .retentionMs(Integer.valueOf(configMap.get(RETENTION_MS).value())) + .retentionBytes(Integer.valueOf(configMap.get(RETENTION_BYTES).value())) + .deleteRetentionMs(Integer.valueOf(configMap.get(DELETE_RETENTION_MS).value())) + .maxMessageBytes(Integer.valueOf(configMap.get(MAX_MESSAGE_BYTES).value())) + .build(); + } catch (Exception e) { + LOGGER.warning("Unable to read kafka-config for topic " + aTopic); + LOGGER.throwing("KafkaResolver", "getTopicConfiguration", e); + return builder.build(); + } } private boolean isTopicExists(AdminClient admin, String topicName) throws InterruptedException, ExecutionException {