Skip to content

Commit

Permalink
Merge pull request #6 from FrogDevelopment/feature/improvements
Browse files Browse the repository at this point in the history
[consul] have a dedicated client for blocking queries
  • Loading branch information
FrogDevelopper authored Dec 11, 2024
2 parents d194683 + 2ba7057 commit be79b53
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 222 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ The watcher can be configured using
consul:
watch:
disabled: false # to disable the watcher, during test for instance
block-timeout: 10m # Sets the read timeout. Default value (10 minutes).
```
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation(mn.micronaut.serde.jackson)
implementation(mn.micronaut.discovery.client)
implementation(mn.micronaut.reactor)
implementation(mn.micronaut.retry)
implementation(mn.guava)

// ----------- TESTS -----------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.frogdevelopment.micronaut.consul.watch;

import java.time.Duration;
import java.util.Optional;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.micronaut.http.client.HttpClientConfiguration;

@RequiresConsul
@ConfigurationProperties(WatchConfiguration.PREFIX)
public class WatchConfiguration extends HttpClientConfiguration {

public static final String EXPR_CONSUL_WATCH_RETRY_COUNT = "${" + WatchConfiguration.PREFIX + ".retry-count:3}";
public static final String EXPR_CONSUL_WATCH_RETRY_DELAY = "${" + WatchConfiguration.PREFIX + ".retry-delay:1s}";

/**
* The default block timeout in minutes.
*/
public static final long DEFAULT_BLOCK_TIMEOUT_MINUTES = 10;

public static final long DEFAULT_WATCH_DELAY_MILLISECONDS = 500;

/**
* The prefix to use for all Consul settings.
*/
public static final String PREFIX = "consul.watch";

private Duration readTimeout = Duration.ofMinutes(DEFAULT_BLOCK_TIMEOUT_MINUTES);
private Duration watchDelay = Duration.ofSeconds(DEFAULT_WATCH_DELAY_MILLISECONDS);

private final ConsulConfiguration consulConfiguration;

public WatchConfiguration(final ConsulConfiguration consulConfiguration) {
super(consulConfiguration);
this.consulConfiguration = consulConfiguration;
}

@Override
public ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return consulConfiguration.getConnectionPoolConfiguration();
}

@Override
public Optional<Duration> getReadTimeout() {
return Optional.ofNullable(readTimeout);
}

@Override
public void setReadTimeout(@Nullable Duration readTimeout) {
this.readTimeout = readTimeout;
}

public Duration getWatchDelay() {
return watchDelay;
}

/**
* Sets the watch delay before each call to avoid flooding. Default value ({@value #DEFAULT_WATCH_DELAY_MILLISECONDS} milliseconds).
*
* @param watchDelay The read timeout
*/
public void setWatchDelay(Duration watchDelay) {
this.watchDelay = watchDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Named;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;
import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher;
Expand All @@ -25,8 +24,6 @@
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.jackson.core.env.JsonPropertySourceLoader;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.TaskScheduler;

/**
* Create all needed {@link Watcher} based on configuration.
Expand All @@ -36,14 +33,13 @@
*/
@Factory
@RequiredArgsConstructor
public class WatcherFactory {
public class WatchFactory {

private static final String CONSUL_PATH_SEPARATOR = "/";

private final Environment environment;
@Named(TaskExecutors.SCHEDULED)
private final TaskScheduler taskScheduler;
private final IndexConsulClient consulClient;
private final WatchConsulClient consulClient;
private final WatchConfiguration watchConfiguration;
private final PropertiesChangeHandler propertiesChangeHandler;

@Context
Expand Down Expand Up @@ -106,11 +102,11 @@ private static String toProfiledPath(final String resource, final String activeN
private Watcher watchNative(final List<String> keyPaths) {
// adding '/' at the end of the kvPath to distinct 'kvPath/' from 'kvPath,profile/'
final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
return new NativeWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
return new NativeWatcher(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler);
}

private Watcher watchConfigurations(final List<String> kvPaths, final PropertySourceLoader propertySourceLoader) {
return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler, propertySourceLoader);
return new ConfigurationsWatcher(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler, propertySourceLoader);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import java.util.List;

import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.client.v1.ConsulClient;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.retry.annotation.Retryable;
import reactor.core.publisher.Mono;

@Requires(beans = WatchConfiguration.class)
@Client(id = ConsulClient.SERVICE_ID, path = "/v1", configuration = WatchConfiguration.class)
public interface WatchConsulClient {

@Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true)
@Retryable(
attempts = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_COUNT,
delay = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_DELAY
)
Mono<List<KeyValue>> watchValues(String key, @Nullable @QueryValue Boolean recurse, @Nullable @QueryValue Integer index);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@Configuration
@RequiresConsul
@Requires(property = "consul.watch.disabled", notEquals = "true", defaultValue = "false")
@Requires(property = WatchConfiguration.PREFIX + "disabled", notEquals = "true", defaultValue = "false")
package com.frogdevelopment.micronaut.consul.watch;

import io.micronaut.context.annotation.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;

import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.exceptions.ReadTimeoutException;
import io.micronaut.http.client.exceptions.ResponseClosedException;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

Expand All @@ -30,11 +28,10 @@
abstract sealed class AbstractWatcher<V> implements Watcher permits ConfigurationsWatcher, NativeWatcher {

protected static final Integer NO_INDEX = null;
private static final int WATCH_DELAY = 1000;

private final List<String> kvPaths;
private final TaskScheduler taskScheduler;
protected final IndexConsulClient consulClient;
protected final WatchConsulClient consulClient;
private final WatchConfiguration watchConfiguration;
private final PropertiesChangeHandler propertiesChangeHandler;

protected final Map<String, V> kvHolder = new ConcurrentHashMap<>();
Expand All @@ -52,7 +49,8 @@ public void start() {
try {
log.debug("Starting KVs watcher");
watching = true;
kvPaths.forEach(kvPath -> watchKvPath(kvPath, 0));
kvPaths.parallelStream()
.forEach(kvPath -> watchKvPath(kvPath, 0));
} catch (final Exception e) {
log.error("Error watching configurations", e);
stop();
Expand Down Expand Up @@ -81,16 +79,16 @@ public void stop() {
}

private void watchKvPath(final String kvPath, final int nbFailures) {
taskScheduler.schedule(Duration.ofMillis(WATCH_DELAY), () -> {
if (!watching) {
log.warn("Watcher is not started");
return;
}
final var disposable = watchValue(kvPath)
// delaying to avoid flood caused by multiple consecutive calls
final var disposable = Mono.delay(watchConfiguration.getWatchDelay())
.then(watchValue(kvPath))
.subscribe(next -> onNext(kvPath, next), throwable -> onError(kvPath, throwable, nbFailures));

listeners.put(kvPath, disposable);
});
}

protected abstract Mono<V> watchValue(String kvPath);
Expand Down Expand Up @@ -118,9 +116,9 @@ private void onNext(String kvPath, final V next) {

private void onError(String kvPath, Throwable throwable, int nbFailures) {
if (throwable instanceof final HttpClientResponseException e && e.getStatus() == HttpStatus.NOT_FOUND) {
log.debug("No KV found with kvPath={}", kvPath);
log.trace("No KV found with kvPath={}", kvPath);
listeners.remove(kvPath);
} else if (throwable instanceof ReadTimeoutException || throwable instanceof ResponseClosedException) {
} else if (throwable instanceof ReadTimeoutException) {
log.debug("Exception [{}] for kvPath={}", throwable, kvPath);
watchKvPath(kvPath, 0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import java.util.Map;
import java.util.Optional;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;

import io.micronaut.context.env.PropertySourceReader;
import io.micronaut.core.util.StringUtils;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -34,11 +34,11 @@ public final class ConfigurationsWatcher extends AbstractWatcher<KeyValue> {
* Default constructor
*/
public ConfigurationsWatcher(final List<String> kvPaths,
final TaskScheduler taskScheduler,
final IndexConsulClient consulClient,
final WatchConsulClient consulClient,
final WatchConfiguration watchConfiguration,
final PropertiesChangeHandler propertiesChangeHandler,
final PropertySourceReader propertySourceReader) {
super(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
super(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler);
this.propertySourceReader = propertySourceReader;
}

Expand All @@ -48,7 +48,7 @@ protected Mono<KeyValue> watchValue(String kvPath) {
.map(KeyValue::getModifyIndex)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, false, modifiedIndex))
return consulClient.watchValues(kvPath, false, modifiedIndex)
.flatMapMany(Flux::fromIterable)
.filter(kv -> kvPath.equals(kv.getKey()))
.singleOrEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import java.util.Optional;
import java.util.stream.Collectors;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;

import io.micronaut.core.util.StringUtils;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.publisher.Mono;

/**
Expand All @@ -35,10 +35,10 @@ public final class NativeWatcher extends AbstractWatcher<List<KeyValue>> {
* Default constructor
*/
public NativeWatcher(final List<String> kvPaths,
final TaskScheduler taskScheduler,
final IndexConsulClient consulClient,
final WatchConsulClient consulClient,
final WatchConfiguration watchConfiguration,
final PropertiesChangeHandler propertiesChangeHandler) {
super(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
super(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler);
}

@Override
Expand All @@ -50,7 +50,7 @@ protected Mono<List<KeyValue>> watchValue(String kvPath) {
.max(Integer::compareTo)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, true, modifiedIndex));
return consulClient.watchValues(kvPath, true, modifiedIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import java.util.Map;

/**
* @param kvPath Path of the KV
* @param previous Previous value of the KV
* @param next New value of the KV
* @author LE GALL Benoît
* @since 1.0.0
*/
public record WatchResult(
String kvPath,
Map<String, Object> previous,
Expand Down
Loading

0 comments on commit be79b53

Please sign in to comment.