Skip to content

Commit

Permalink
[consul] using Consul Blocking Queries
Browse files Browse the repository at this point in the history
  • Loading branch information
FrogDevelopper committed Dec 8, 2024
1 parent 496e630 commit d385a1e
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 132 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@

import java.util.ArrayList;
import java.util.List;

import jakarta.inject.Named;
import jakarta.inject.Singleton;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;
import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.Watcher;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertiesPropertySourceLoader;
import io.micronaut.context.env.PropertySourceLoader;
import io.micronaut.context.env.yaml.YamlPropertySourceLoader;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.discovery.config.ConfigDiscoveryConfiguration.Format;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.jackson.core.env.JsonPropertySourceLoader;
import io.micronaut.scheduling.TaskExecutors;
Expand All @@ -47,15 +46,17 @@ public class WatcherFactory {
private final IndexConsulClient consulClient;
private final PropertiesChangeHandler propertiesChangeHandler;

@Singleton
@Bean(preDestroy = "close")
@Context
@Bean(preDestroy = "stop")
Watcher createWatcher(final ConsulConfiguration consulConfiguration) {
final var kvPaths = computeKvPaths(consulConfiguration);

final var format = consulConfiguration.getConfiguration().getFormat();
final var watcher = switch (format) {
// case NATIVE -> watchNative(kvPaths);
case JSON, YAML, PROPERTIES -> watchConfigurations(kvPaths, resolveLoader(format));
case NATIVE -> watchNative(kvPaths);
case JSON -> watchConfigurations(kvPaths, new JsonPropertySourceLoader());
case YAML -> watchConfigurations(kvPaths, new YamlPropertySourceLoader());
case PROPERTIES -> watchConfigurations(kvPaths, new PropertiesPropertySourceLoader());
default -> throw new ConfigurationException("Unhandled configuration format: " + format);
};

Expand Down Expand Up @@ -102,23 +103,14 @@ private static String toProfiledPath(final String resource, final String activeN
return resource + "," + activeName;
}

// private Watcher watchNative(final List<String> keyPaths) {
// // adding '/' at the end of the kvPath to distinct 'kvPath/value' from 'kvPath,profile/value'
// final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
// return new NativeWatcher(kvPaths, consulClient, propertiesChangeHandler);
// }
private Watcher watchNative(final List<String> keyPaths) {
// adding '/' at the end of the kvPath to distinct 'kvPath/' from 'kvPath,profile/'
final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
return new NativeWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
}

private Watcher watchConfigurations(final List<String> kvPaths, final PropertySourceLoader propertySourceLoader) {
return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler, propertySourceLoader);
}

private static PropertySourceLoader resolveLoader(final Format format) {
return switch (format) {
case JSON -> new JsonPropertySourceLoader();
case PROPERTIES -> new PropertiesPropertySourceLoader();
case YAML -> new YamlPropertySourceLoader();
default -> throw new ConfigurationException("Unsupported properties file format: " + format);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.reactivestreams.Publisher;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
Expand All @@ -16,6 +17,15 @@
@Requires(beans = ConsulConfiguration.class)
public interface IndexConsulClient extends Closeable, AutoCloseable {

default Publisher<List<KeyValue>> readValues(String key) {
return readValues(key, true);
}

default Publisher<List<KeyValue>> readValues(String key, boolean recurse) {
return readValues(key, recurse, null);
}

@Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true)
Publisher<List<KeyValue>> watchValue(String key, @QueryValue boolean recurse, @QueryValue int index);
Publisher<List<KeyValue>> readValues(String key, @QueryValue boolean recurse, @Nullable @QueryValue Integer index);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
Expand All @@ -10,7 +11,41 @@
@Serdeable
@ReflectiveAccess
@JsonNaming(PropertyNamingStrategies.UpperCamelCaseStrategy.class)
public record KeyValue(@JsonProperty("ModifyIndex") int modifyIndex,
@JsonProperty("Key") String key,
@JsonProperty("Value") String value) {
public class KeyValue {

private final Integer modifyIndex;
private final String key;
private final String value;

/**
* @param key The key
* @param value The value
*/
@JsonCreator
public KeyValue(@JsonProperty("ModifyIndex") Integer modifyIndex, @JsonProperty("Key") String key, @JsonProperty("Value") String value) {
this.modifyIndex = modifyIndex;
this.key = key;
this.value = value;
}

/**
* @return The modifyIndex
*/
public Integer getModifyIndex() {
return modifyIndex;
}

/**
* @return The key
*/
public String getKey() {
return key;
}

/**
* @return The value
*/
public String getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@Configuration
@RequiresConsul
@Requires(property = WatchConfiguration.PREFIX + ".disabled", notEquals = "true", defaultValue = "false")
@Requires(property = "consul.watch.disabled", notEquals = "true", defaultValue = "false")
package com.frogdevelopment.micronaut.consul.watch;

import io.micronaut.context.annotation.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.random.RandomGenerator;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
Expand All @@ -17,6 +16,7 @@
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.exceptions.ReadTimeoutException;
import io.micronaut.http.client.exceptions.ResponseClosedException;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
Expand All @@ -29,9 +29,12 @@
@RequiredArgsConstructor
abstract sealed class AbstractWatcher<V> implements Watcher permits ConfigurationsWatcher, NativeWatcher {

protected static final Integer NO_INDEX = null;
private static final int WATCH_DELAY = 1000;

private final List<String> kvPaths;
private final TaskScheduler taskScheduler;
private final IndexConsulClient consulClient;
protected final IndexConsulClient consulClient;
private final PropertiesChangeHandler propertiesChangeHandler;

protected final Map<String, V> kvHolder = new ConcurrentHashMap<>();
Expand All @@ -47,98 +50,97 @@ public void start() {
}

try {
log.info("Starting KVs watchers");
kvPaths.forEach(this::watchKvPath);
log.debug("Starting KVs watcher");
watching = true;
kvPaths.forEach(kvPath -> watchKvPath(kvPath, 0));
} catch (final Exception e) {
log.error("Error reading configurations from Consul", e);
log.error("Error watching configurations", e);
stop();
}
watching = true;
}

@Override
public void close() {
public void stop() {
if (!watching) {
log.warn("You tried to stop an unstarted Watcher");
return;
}

log.warn("Stopping KVs watchers");
log.debug("Stopping KVs watchers");
listeners.forEach((key, value) -> {
try {
log.debug("Stopping watch for kvPath={}", key);
value.dispose();
} catch (final Exception e) {
log.error("Error stopping configurations watcher for kvPath={}", key, e);
}
});
listeners.clear();
kvHolder.clear();
watching = false;
}

private void watchKvPath(final String kvPath) {
final var modifiedIndex = getModifiedIndex(kvPath);
final var delay = RandomGenerator.getDefault().nextInt(100, 500);
taskScheduler.schedule(Duration.ofMillis(delay), () -> {
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
final var disposable = Mono.from(consulClient.watchValue(kvPath, false, modifiedIndex))
.flatMap(kvs -> mapToData(kvPath, kvs))
.subscribe(kv -> onNext(kvPath, kv), throwable -> onError(kvPath, throwable));

listeners.put(kvPath, disposable);
}
);
}
private void watchKvPath(final String kvPath, final int nbFailures) {
taskScheduler.schedule(Duration.ofMillis(WATCH_DELAY), () -> {
if (!watching) {
log.warn("Watcher is not started");
return;
}
final var disposable = watchValue(kvPath)
.subscribe(next -> onNext(kvPath, next), throwable -> onError(kvPath, throwable, nbFailures));

protected abstract int getModifiedIndex(final String kvPath);
listeners.put(kvPath, disposable);
});
}

protected abstract Mono<V> mapToData(String kvPath, final List<KeyValue> kvs);
protected abstract Mono<V> watchValue(String kvPath);

private void onNext(String kvPath, final V next) {
final var previous = kvHolder.put(kvPath, next);

if (previous == null) {
handleInit();
handleInit(kvPath);
} else if (areEqual(previous, next)) {
handleNoChange();
handleNoChange(kvPath);
} else {
final var previousValue = readValue(previous);
final var nextValue = readValue(next);

handleSuccess(new WatchResult(kvPath, previousValue, nextValue));
propertiesChangeHandler.handleChanges(new WatchResult(kvPath, previousValue, nextValue));
}

watchKvPath(kvPath);
watchKvPath(kvPath, 0);
}

protected abstract boolean areEqual(final V previous, final V next);

protected abstract Map<String, Object> readValue(final V keyValue);

protected final void onError(String kvPath, Throwable throwable) {
private void onError(String kvPath, Throwable throwable, int nbFailures) {
if (throwable instanceof final HttpClientResponseException e && e.getStatus() == HttpStatus.NOT_FOUND) {
log.debug("No KV found with kvPath={}", kvPath);
} else if (throwable instanceof ReadTimeoutException) {
log.debug("Read timed out for kvPath={}", kvPath);
watchKvPath(kvPath);
listeners.remove(kvPath);
} else if (throwable instanceof ReadTimeoutException || throwable instanceof ResponseClosedException) {
log.debug("Exception [{}] for kvPath={}", throwable, kvPath);
watchKvPath(kvPath, 0);
} else {
log.error("Watching kvPath={} failed", kvPath, throwable);
if (nbFailures <= 3) {
watchKvPath(kvPath, nbFailures + 1);
}
}
}

protected final void handleInit() {
log.debug("Init watcher");
private void handleInit(final String kvPath) {
log.debug("Init watcher for kvPath={}", kvPath);
}

protected final void handleNoChange() {
log.debug("Nothing changed");
private void handleNoChange(final String kvPath) {
log.debug("Nothing changed for kvPath={}", kvPath);
}

protected final byte[] decodeValue(final KeyValue keyValue) {
return base64Decoder.decode(keyValue.value());
}

protected final void handleSuccess(final WatchResult result) {
log.info("Consul poll successful");
propertiesChangeHandler.handleChanges(result);
return base64Decoder.decode(keyValue.getValue());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,14 @@ public ConfigurationsWatcher(final List<String> kvPaths,
}

@Override
protected int getModifiedIndex(final String kvPath) {
return Optional.ofNullable(kvHolder.get(kvPath))
.map(KeyValue::modifyIndex)
.orElse(0);
}

@Override
protected Mono<KeyValue> mapToData(String kvPath, final List<KeyValue> kvs) {
return Flux.fromIterable(kvs)
.filter(kv -> kvPath.equals(kv.key()))
protected Mono<KeyValue> watchValue(String kvPath) {
final var modifiedIndex = Optional.ofNullable(kvHolder.get(kvPath))
.map(KeyValue::getModifyIndex)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, false, modifiedIndex))
.flatMapMany(Flux::fromIterable)
.filter(kv -> kvPath.equals(kv.getKey()))
.singleOrEmpty();
}

Expand All @@ -63,11 +61,11 @@ protected boolean areEqual(final KeyValue previous, final KeyValue next) {

@Override
protected Map<String, Object> readValue(final KeyValue keyValue) {
if (keyValue == null || StringUtils.isEmpty(keyValue.value())) {
if (keyValue == null || StringUtils.isEmpty(keyValue.getValue())) {
return Collections.emptyMap();
}

return propertySourceReader.read(keyValue.key(), decodeValue(keyValue));
return propertySourceReader.read(keyValue.getKey(), decodeValue(keyValue));
}

}
Loading

0 comments on commit d385a1e

Please sign in to comment.