diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java deleted file mode 100644 index 5d250b3..0000000 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatchConfiguration.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.frogdevelopment.micronaut.consul.watch; - -import lombok.Data; - -import java.time.Duration; - -import io.micronaut.context.annotation.ConfigurationProperties; - -/** - * @author LE GALL Benoît - * @since 1.0.0 - */ -@Data -@ConfigurationProperties(WatchConfiguration.PREFIX) -public class WatchConfiguration { - - public static final String PREFIX = "consul.watch"; - - /** - * Initial Delay before starting the polling task. Default to 5ms - */ - private Duration initialDelay = Duration.ofMillis(5); - /** - * Period between each configuration poll. Default to 30s. - */ - private Duration period = Duration.ofSeconds(30); -} diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatcherFactory.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatcherFactory.java index 7cbbf98..e7958f3 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatcherFactory.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/WatcherFactory.java @@ -7,23 +7,22 @@ import java.util.ArrayList; import java.util.List; - import jakarta.inject.Named; -import jakarta.inject.Singleton; import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient; import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler; import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher; +import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher; import com.frogdevelopment.micronaut.consul.watch.watcher.Watcher; import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.Context; import io.micronaut.context.annotation.Factory; import io.micronaut.context.env.Environment; import io.micronaut.context.env.PropertiesPropertySourceLoader; import io.micronaut.context.env.PropertySourceLoader; import io.micronaut.context.env.yaml.YamlPropertySourceLoader; import io.micronaut.context.exceptions.ConfigurationException; -import io.micronaut.discovery.config.ConfigDiscoveryConfiguration.Format; import io.micronaut.discovery.consul.ConsulConfiguration; import io.micronaut.jackson.core.env.JsonPropertySourceLoader; import io.micronaut.scheduling.TaskExecutors; @@ -47,15 +46,17 @@ public class WatcherFactory { private final IndexConsulClient consulClient; private final PropertiesChangeHandler propertiesChangeHandler; - @Singleton - @Bean(preDestroy = "close") + @Context + @Bean(preDestroy = "stop") Watcher createWatcher(final ConsulConfiguration consulConfiguration) { final var kvPaths = computeKvPaths(consulConfiguration); final var format = consulConfiguration.getConfiguration().getFormat(); final var watcher = switch (format) { -// case NATIVE -> watchNative(kvPaths); - case JSON, YAML, PROPERTIES -> watchConfigurations(kvPaths, resolveLoader(format)); + case NATIVE -> watchNative(kvPaths); + case JSON -> watchConfigurations(kvPaths, new JsonPropertySourceLoader()); + case YAML -> watchConfigurations(kvPaths, new YamlPropertySourceLoader()); + case PROPERTIES -> watchConfigurations(kvPaths, new PropertiesPropertySourceLoader()); default -> throw new ConfigurationException("Unhandled configuration format: " + format); }; @@ -102,23 +103,14 @@ private static String toProfiledPath(final String resource, final String activeN return resource + "," + activeName; } -// private Watcher watchNative(final List keyPaths) { -// // adding '/' at the end of the kvPath to distinct 'kvPath/value' from 'kvPath,profile/value' -// final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList(); -// return new NativeWatcher(kvPaths, consulClient, propertiesChangeHandler); -// } + private Watcher watchNative(final List keyPaths) { + // adding '/' at the end of the kvPath to distinct 'kvPath/' from 'kvPath,profile/' + final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList(); + return new NativeWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler); + } private Watcher watchConfigurations(final List kvPaths, final PropertySourceLoader propertySourceLoader) { return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler, propertySourceLoader); } - private static PropertySourceLoader resolveLoader(final Format format) { - return switch (format) { - case JSON -> new JsonPropertySourceLoader(); - case PROPERTIES -> new PropertiesPropertySourceLoader(); - case YAML -> new YamlPropertySourceLoader(); - default -> throw new ConfigurationException("Unsupported properties file format: " + format); - }; - } - } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/IndexConsulClient.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/IndexConsulClient.java index 940ce87..7a60b99 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/IndexConsulClient.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/IndexConsulClient.java @@ -6,6 +6,7 @@ import org.reactivestreams.Publisher; import io.micronaut.context.annotation.Requires; +import io.micronaut.core.annotation.Nullable; import io.micronaut.discovery.consul.ConsulConfiguration; import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.QueryValue; @@ -16,6 +17,15 @@ @Requires(beans = ConsulConfiguration.class) public interface IndexConsulClient extends Closeable, AutoCloseable { + default Publisher> readValues(String key) { + return readValues(key, true); + } + + default Publisher> readValues(String key, boolean recurse) { + return readValues(key, recurse, null); + } + @Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true) - Publisher> watchValue(String key, @QueryValue boolean recurse, @QueryValue int index); + Publisher> readValues(String key, @QueryValue boolean recurse, @Nullable @QueryValue Integer index); + } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/KeyValue.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/KeyValue.java index 90378d1..406e7dc 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/KeyValue.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/client/KeyValue.java @@ -1,5 +1,6 @@ package com.frogdevelopment.micronaut.consul.watch.client; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.annotation.JsonNaming; @@ -10,7 +11,41 @@ @Serdeable @ReflectiveAccess @JsonNaming(PropertyNamingStrategies.UpperCamelCaseStrategy.class) -public record KeyValue(@JsonProperty("ModifyIndex") int modifyIndex, - @JsonProperty("Key") String key, - @JsonProperty("Value") String value) { +public class KeyValue { + + private final Integer modifyIndex; + private final String key; + private final String value; + + /** + * @param key The key + * @param value The value + */ + @JsonCreator + public KeyValue(@JsonProperty("ModifyIndex") Integer modifyIndex, @JsonProperty("Key") String key, @JsonProperty("Value") String value) { + this.modifyIndex = modifyIndex; + this.key = key; + this.value = value; + } + + /** + * @return The modifyIndex + */ + public Integer getModifyIndex() { + return modifyIndex; + } + + /** + * @return The key + */ + public String getKey() { + return key; + } + + /** + * @return The value + */ + public String getValue() { + return value; + } } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/package-info.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/package-info.java index 03e6675..572f65d 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/package-info.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/package-info.java @@ -1,6 +1,6 @@ @Configuration @RequiresConsul -@Requires(property = WatchConfiguration.PREFIX + ".disabled", notEquals = "true", defaultValue = "false") +@Requires(property = "consul.watch.disabled", notEquals = "true", defaultValue = "false") package com.frogdevelopment.micronaut.consul.watch; import io.micronaut.context.annotation.Configuration; diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java index 9c14c42..8c83bf8 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/AbstractWatcher.java @@ -8,7 +8,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.random.RandomGenerator; import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient; import com.frogdevelopment.micronaut.consul.watch.client.KeyValue; @@ -17,6 +16,7 @@ import io.micronaut.http.HttpStatus; import io.micronaut.http.client.exceptions.HttpClientResponseException; import io.micronaut.http.client.exceptions.ReadTimeoutException; +import io.micronaut.http.client.exceptions.ResponseClosedException; import io.micronaut.scheduling.TaskScheduler; import reactor.core.Disposable; import reactor.core.publisher.Mono; @@ -29,9 +29,12 @@ @RequiredArgsConstructor abstract sealed class AbstractWatcher implements Watcher permits ConfigurationsWatcher, NativeWatcher { + protected static final Integer NO_INDEX = null; + private static final int WATCH_DELAY = 1000; + private final List kvPaths; private final TaskScheduler taskScheduler; - private final IndexConsulClient consulClient; + protected final IndexConsulClient consulClient; private final PropertiesChangeHandler propertiesChangeHandler; protected final Map kvHolder = new ConcurrentHashMap<>(); @@ -47,98 +50,97 @@ public void start() { } try { - log.info("Starting KVs watchers"); - kvPaths.forEach(this::watchKvPath); + log.debug("Starting KVs watcher"); + watching = true; + kvPaths.forEach(kvPath -> watchKvPath(kvPath, 0)); } catch (final Exception e) { - log.error("Error reading configurations from Consul", e); + log.error("Error watching configurations", e); + stop(); } - watching = true; } @Override - public void close() { + public void stop() { if (!watching) { log.warn("You tried to stop an unstarted Watcher"); return; } - log.warn("Stopping KVs watchers"); + log.debug("Stopping KVs watchers"); listeners.forEach((key, value) -> { try { + log.debug("Stopping watch for kvPath={}", key); value.dispose(); } catch (final Exception e) { log.error("Error stopping configurations watcher for kvPath={}", key, e); } }); listeners.clear(); + kvHolder.clear(); watching = false; } - private void watchKvPath(final String kvPath) { - final var modifiedIndex = getModifiedIndex(kvPath); - final var delay = RandomGenerator.getDefault().nextInt(100, 500); - taskScheduler.schedule(Duration.ofMillis(delay), () -> { - log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex); - final var disposable = Mono.from(consulClient.watchValue(kvPath, false, modifiedIndex)) - .flatMap(kvs -> mapToData(kvPath, kvs)) - .subscribe(kv -> onNext(kvPath, kv), throwable -> onError(kvPath, throwable)); - - listeners.put(kvPath, disposable); - } - ); - } + private void watchKvPath(final String kvPath, final int nbFailures) { + taskScheduler.schedule(Duration.ofMillis(WATCH_DELAY), () -> { + if (!watching) { + log.warn("Watcher is not started"); + return; + } + final var disposable = watchValue(kvPath) + .subscribe(next -> onNext(kvPath, next), throwable -> onError(kvPath, throwable, nbFailures)); - protected abstract int getModifiedIndex(final String kvPath); + listeners.put(kvPath, disposable); + }); + } - protected abstract Mono mapToData(String kvPath, final List kvs); + protected abstract Mono watchValue(String kvPath); private void onNext(String kvPath, final V next) { final var previous = kvHolder.put(kvPath, next); if (previous == null) { - handleInit(); + handleInit(kvPath); } else if (areEqual(previous, next)) { - handleNoChange(); + handleNoChange(kvPath); } else { final var previousValue = readValue(previous); final var nextValue = readValue(next); - handleSuccess(new WatchResult(kvPath, previousValue, nextValue)); + propertiesChangeHandler.handleChanges(new WatchResult(kvPath, previousValue, nextValue)); } - watchKvPath(kvPath); + watchKvPath(kvPath, 0); } protected abstract boolean areEqual(final V previous, final V next); protected abstract Map readValue(final V keyValue); - protected final void onError(String kvPath, Throwable throwable) { + private void onError(String kvPath, Throwable throwable, int nbFailures) { if (throwable instanceof final HttpClientResponseException e && e.getStatus() == HttpStatus.NOT_FOUND) { log.debug("No KV found with kvPath={}", kvPath); - } else if (throwable instanceof ReadTimeoutException) { - log.debug("Read timed out for kvPath={}", kvPath); - watchKvPath(kvPath); + listeners.remove(kvPath); + } else if (throwable instanceof ReadTimeoutException || throwable instanceof ResponseClosedException) { + log.debug("Exception [{}] for kvPath={}", throwable, kvPath); + watchKvPath(kvPath, 0); } else { log.error("Watching kvPath={} failed", kvPath, throwable); + if (nbFailures <= 3) { + watchKvPath(kvPath, nbFailures + 1); + } } } - protected final void handleInit() { - log.debug("Init watcher"); + private void handleInit(final String kvPath) { + log.debug("Init watcher for kvPath={}", kvPath); } - protected final void handleNoChange() { - log.debug("Nothing changed"); + private void handleNoChange(final String kvPath) { + log.debug("Nothing changed for kvPath={}", kvPath); } protected final byte[] decodeValue(final KeyValue keyValue) { - return base64Decoder.decode(keyValue.value()); - } - - protected final void handleSuccess(final WatchResult result) { - log.info("Consul poll successful"); - propertiesChangeHandler.handleChanges(result); + return base64Decoder.decode(keyValue.getValue()); } } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/ConfigurationsWatcher.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/ConfigurationsWatcher.java index b4755a5..2dd58f5 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/ConfigurationsWatcher.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/ConfigurationsWatcher.java @@ -43,16 +43,14 @@ public ConfigurationsWatcher(final List kvPaths, } @Override - protected int getModifiedIndex(final String kvPath) { - return Optional.ofNullable(kvHolder.get(kvPath)) - .map(KeyValue::modifyIndex) - .orElse(0); - } - - @Override - protected Mono mapToData(String kvPath, final List kvs) { - return Flux.fromIterable(kvs) - .filter(kv -> kvPath.equals(kv.key())) + protected Mono watchValue(String kvPath) { + final var modifiedIndex = Optional.ofNullable(kvHolder.get(kvPath)) + .map(KeyValue::getModifyIndex) + .orElse(NO_INDEX); + log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex); + return Mono.from(consulClient.readValues(kvPath, false, modifiedIndex)) + .flatMapMany(Flux::fromIterable) + .filter(kv -> kvPath.equals(kv.getKey())) .singleOrEmpty(); } @@ -63,11 +61,11 @@ protected boolean areEqual(final KeyValue previous, final KeyValue next) { @Override protected Map readValue(final KeyValue keyValue) { - if (keyValue == null || StringUtils.isEmpty(keyValue.value())) { + if (keyValue == null || StringUtils.isEmpty(keyValue.getValue())) { return Collections.emptyMap(); } - return propertySourceReader.read(keyValue.key(), decodeValue(keyValue)); + return propertySourceReader.read(keyValue.getKey(), decodeValue(keyValue)); } } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/KvUtils.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/KvUtils.java index 2c5245b..38f5622 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/KvUtils.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/KvUtils.java @@ -20,7 +20,7 @@ private KvUtils() { } /** - * Compare 2 {@link KeyValue} + * Compare 2 {@link KeyValue} by key and value * * @param left 1st {@link KeyValue} to compare * @param right 2d {@link KeyValue} to compare @@ -35,7 +35,7 @@ static boolean areEqual(final KeyValue left, final KeyValue right) { return false; } - return left.key().equals(right.key()) && left.value().equals(right.value()); + return left.getKey().equals(right.getKey()) && left.getValue().equals(right.getValue()); } /** @@ -51,13 +51,13 @@ static boolean areEqual(final List left, final List right) { return false; } - left.sort(Comparator.comparing(KeyValue::key)); - right.sort(Comparator.comparing(KeyValue::key)); + left.sort(Comparator.comparing(KeyValue::getKey)); + right.sort(Comparator.comparing(KeyValue::getKey)); for (int i = 0; i < left.size(); i++) { - final var nextKV = left.get(i); - final var previousKV = right.get(i); - if (!areEqual(previousKV, nextKV)) { + final var leftKV = left.get(i); + final var rightKV = right.get(i); + if (!areEqual(rightKV, leftKV)) { return false; } } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/NativeWatcher.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/NativeWatcher.java index 5ffe2d2..2303727 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/NativeWatcher.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/NativeWatcher.java @@ -31,7 +31,6 @@ public final class NativeWatcher extends AbstractWatcher> { private final Map keysMap = new HashMap<>(); - /** * Default constructor */ @@ -43,17 +42,15 @@ public NativeWatcher(final List kvPaths, } @Override - protected int getModifiedIndex(final String kvPath) { - return Optional.ofNullable(kvHolder.get(kvPath)) - .flatMap(keyValues -> keyValues.stream() - .map(KeyValue::modifyIndex) - .max(Integer::compareTo)) - .orElse(0); - } - - @Override - protected Mono> mapToData(String kvPath, List kvs) { - return Mono.just(kvs); + protected Mono> watchValue(String kvPath) { + final var modifiedIndex = Optional.ofNullable(kvHolder.get(kvPath)) + .stream() + .flatMap(List::stream) + .map(KeyValue::getModifyIndex) + .max(Integer::compareTo) + .orElse(NO_INDEX); + log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex); + return Mono.from(consulClient.readValues(kvPath, true, modifiedIndex)); } @Override @@ -69,12 +66,12 @@ protected Map readValue(final List keyValues) { return keyValues.stream() .filter(Objects::nonNull) - .filter(kv -> StringUtils.isNotEmpty(kv.value())) + .filter(kv -> StringUtils.isNotEmpty(kv.getValue())) .collect(Collectors.toMap(this::pathToPropertyKey, keyValue -> new String(decodeValue(keyValue)))); } private String pathToPropertyKey(final KeyValue kv) { - return keysMap.computeIfAbsent(kv.key(), key -> List.of(key.split("/")).getLast()); + return keysMap.computeIfAbsent(kv.getKey(), key -> List.of(key.split("/")).getLast()); } } diff --git a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/Watcher.java b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/Watcher.java index 0a920f3..774c591 100644 --- a/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/Watcher.java +++ b/src/main/java/com/frogdevelopment/micronaut/consul/watch/watcher/Watcher.java @@ -1,16 +1,19 @@ package com.frogdevelopment.micronaut.consul.watch.watcher; -import java.io.Closeable; - /** * @author LE GALL Benoît * @since 1.0.0 */ -public interface Watcher extends Closeable { +public interface Watcher { /** * Start the watching. */ void start(); + /** + * Stop the watching. + */ + void stop(); + } diff --git a/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchTriggerTest.java b/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchTriggerTest.java index a3fd9ea..8e54d62 100644 --- a/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchTriggerTest.java +++ b/src/test/java/com/frogdevelopment/micronaut/consul/watch/WatchTriggerTest.java @@ -20,8 +20,6 @@ class WatchTriggerTest { @Mock private TaskScheduler taskScheduler; - @Spy - private final WatchConfiguration watchConfiguration = new WatchConfiguration(); @Mock private Watcher watcher;