Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consul] have a dedicated client for blocking queries #6

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ The watcher can be configured using
consul:
watch:
disabled: false # to disable the watcher, during test for instance
block-timeout: 10m # Sets the read timeout. Default value (10 minutes).
```
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation(mn.micronaut.serde.jackson)
implementation(mn.micronaut.discovery.client)
implementation(mn.micronaut.reactor)
implementation(mn.micronaut.retry)
implementation(mn.guava)

// ----------- TESTS -----------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.frogdevelopment.micronaut.consul.watch;

import java.time.Duration;
import java.util.Optional;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.condition.RequiresConsul;
import io.micronaut.http.client.HttpClientConfiguration;

@RequiresConsul
@ConfigurationProperties(WatchConfiguration.PREFIX)
public class WatchConfiguration extends HttpClientConfiguration {

public static final String EXPR_CONSUL_WATCH_RETRY_COUNT = "${" + WatchConfiguration.PREFIX + ".retry-count:3}";
public static final String EXPR_CONSUL_WATCH_RETRY_DELAY = "${" + WatchConfiguration.PREFIX + ".retry-delay:1s}";

/**
* The default block timeout in minutes.
*/
public static final long DEFAULT_BLOCK_TIMEOUT_MINUTES = 10;

public static final long DEFAULT_WATCH_DELAY_MILLISECONDS = 500;

/**
* The prefix to use for all Consul settings.
*/
public static final String PREFIX = "consul.watch";

private Duration readTimeout = Duration.ofMinutes(DEFAULT_BLOCK_TIMEOUT_MINUTES);
private Duration watchDelay = Duration.ofSeconds(DEFAULT_WATCH_DELAY_MILLISECONDS);

private final ConsulConfiguration consulConfiguration;

public WatchConfiguration(final ConsulConfiguration consulConfiguration) {
super(consulConfiguration);
this.consulConfiguration = consulConfiguration;
}

@Override
public ConnectionPoolConfiguration getConnectionPoolConfiguration() {
return consulConfiguration.getConnectionPoolConfiguration();
}

@Override
public Optional<Duration> getReadTimeout() {
return Optional.ofNullable(readTimeout);
}

@Override
public void setReadTimeout(@Nullable Duration readTimeout) {
this.readTimeout = readTimeout;
}

public Duration getWatchDelay() {
return watchDelay;
}

/**
* Sets the watch delay before each call to avoid flooding. Default value ({@value #DEFAULT_WATCH_DELAY_MILLISECONDS} milliseconds).
*
* @param watchDelay The read timeout
*/
public void setWatchDelay(Duration watchDelay) {
this.watchDelay = watchDelay;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Named;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;
import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher;
Expand All @@ -25,8 +24,6 @@
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.jackson.core.env.JsonPropertySourceLoader;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.TaskScheduler;

/**
* Create all needed {@link Watcher} based on configuration.
Expand All @@ -36,14 +33,13 @@
*/
@Factory
@RequiredArgsConstructor
public class WatcherFactory {
public class WatchFactory {

private static final String CONSUL_PATH_SEPARATOR = "/";

private final Environment environment;
@Named(TaskExecutors.SCHEDULED)
private final TaskScheduler taskScheduler;
private final IndexConsulClient consulClient;
private final WatchConsulClient consulClient;
private final WatchConfiguration watchConfiguration;
private final PropertiesChangeHandler propertiesChangeHandler;

@Context
Expand Down Expand Up @@ -106,11 +102,11 @@ private static String toProfiledPath(final String resource, final String activeN
private Watcher watchNative(final List<String> keyPaths) {
// adding '/' at the end of the kvPath to distinct 'kvPath/' from 'kvPath,profile/'
final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
return new NativeWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
return new NativeWatcher(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler);
}

private Watcher watchConfigurations(final List<String> kvPaths, final PropertySourceLoader propertySourceLoader) {
return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler, propertySourceLoader);
return new ConfigurationsWatcher(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler, propertySourceLoader);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import java.util.List;

import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.client.v1.ConsulClient;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.retry.annotation.Retryable;
import reactor.core.publisher.Mono;

@Requires(beans = WatchConfiguration.class)
@Client(id = ConsulClient.SERVICE_ID, path = "/v1", configuration = WatchConfiguration.class)
public interface WatchConsulClient {

@Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true)
@Retryable(
attempts = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_COUNT,
delay = WatchConfiguration.EXPR_CONSUL_WATCH_RETRY_DELAY
)
Mono<List<KeyValue>> watchValues(String key, @Nullable @QueryValue Boolean recurse, @Nullable @QueryValue Integer index);

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@Configuration
@RequiresConsul
@Requires(property = "consul.watch.disabled", notEquals = "true", defaultValue = "false")
@Requires(property = WatchConfiguration.PREFIX + "disabled", notEquals = "true", defaultValue = "false")
package com.frogdevelopment.micronaut.consul.watch;

import io.micronaut.context.annotation.Configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;

import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.exceptions.ReadTimeoutException;
import io.micronaut.http.client.exceptions.ResponseClosedException;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

Expand All @@ -30,11 +28,10 @@
abstract sealed class AbstractWatcher<V> implements Watcher permits ConfigurationsWatcher, NativeWatcher {

protected static final Integer NO_INDEX = null;
private static final int WATCH_DELAY = 1000;

private final List<String> kvPaths;
private final TaskScheduler taskScheduler;
protected final IndexConsulClient consulClient;
protected final WatchConsulClient consulClient;
private final WatchConfiguration watchConfiguration;
private final PropertiesChangeHandler propertiesChangeHandler;

protected final Map<String, V> kvHolder = new ConcurrentHashMap<>();
Expand All @@ -52,7 +49,8 @@ public void start() {
try {
log.debug("Starting KVs watcher");
watching = true;
kvPaths.forEach(kvPath -> watchKvPath(kvPath, 0));
kvPaths.parallelStream()
.forEach(kvPath -> watchKvPath(kvPath, 0));
} catch (final Exception e) {
log.error("Error watching configurations", e);
stop();
Expand Down Expand Up @@ -81,16 +79,16 @@ public void stop() {
}

private void watchKvPath(final String kvPath, final int nbFailures) {
taskScheduler.schedule(Duration.ofMillis(WATCH_DELAY), () -> {
if (!watching) {
log.warn("Watcher is not started");
return;
}
final var disposable = watchValue(kvPath)
// delaying to avoid flood caused by multiple consecutive calls
final var disposable = Mono.delay(watchConfiguration.getWatchDelay())
.then(watchValue(kvPath))
.subscribe(next -> onNext(kvPath, next), throwable -> onError(kvPath, throwable, nbFailures));

listeners.put(kvPath, disposable);
});
}

protected abstract Mono<V> watchValue(String kvPath);
Expand Down Expand Up @@ -118,9 +116,9 @@ private void onNext(String kvPath, final V next) {

private void onError(String kvPath, Throwable throwable, int nbFailures) {
if (throwable instanceof final HttpClientResponseException e && e.getStatus() == HttpStatus.NOT_FOUND) {
log.debug("No KV found with kvPath={}", kvPath);
log.trace("No KV found with kvPath={}", kvPath);
listeners.remove(kvPath);
} else if (throwable instanceof ReadTimeoutException || throwable instanceof ResponseClosedException) {
} else if (throwable instanceof ReadTimeoutException) {
log.debug("Exception [{}] for kvPath={}", throwable, kvPath);
watchKvPath(kvPath, 0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
import java.util.Map;
import java.util.Optional;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;

import io.micronaut.context.env.PropertySourceReader;
import io.micronaut.core.util.StringUtils;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -34,11 +34,11 @@ public final class ConfigurationsWatcher extends AbstractWatcher<KeyValue> {
* Default constructor
*/
public ConfigurationsWatcher(final List<String> kvPaths,
final TaskScheduler taskScheduler,
final IndexConsulClient consulClient,
final WatchConsulClient consulClient,
final WatchConfiguration watchConfiguration,
final PropertiesChangeHandler propertiesChangeHandler,
final PropertySourceReader propertySourceReader) {
super(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
super(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler);
this.propertySourceReader = propertySourceReader;
}

Expand All @@ -48,7 +48,7 @@ protected Mono<KeyValue> watchValue(String kvPath) {
.map(KeyValue::getModifyIndex)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, false, modifiedIndex))
return consulClient.watchValues(kvPath, false, modifiedIndex)
.flatMapMany(Flux::fromIterable)
.filter(kv -> kvPath.equals(kv.getKey()))
.singleOrEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import java.util.Optional;
import java.util.stream.Collectors;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.WatchConfiguration;
import com.frogdevelopment.micronaut.consul.watch.client.KeyValue;
import com.frogdevelopment.micronaut.consul.watch.client.WatchConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;

import io.micronaut.core.util.StringUtils;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.publisher.Mono;

/**
Expand All @@ -35,10 +35,10 @@ public final class NativeWatcher extends AbstractWatcher<List<KeyValue>> {
* Default constructor
*/
public NativeWatcher(final List<String> kvPaths,
final TaskScheduler taskScheduler,
final IndexConsulClient consulClient,
final WatchConsulClient consulClient,
final WatchConfiguration watchConfiguration,
final PropertiesChangeHandler propertiesChangeHandler) {
super(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
super(kvPaths, consulClient, watchConfiguration, propertiesChangeHandler);
}

@Override
Expand All @@ -50,7 +50,7 @@ protected Mono<List<KeyValue>> watchValue(String kvPath) {
.max(Integer::compareTo)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, true, modifiedIndex));
return consulClient.watchValues(kvPath, true, modifiedIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

import java.util.Map;

/**
* @param kvPath Path of the KV
* @param previous Previous value of the KV
* @param next New value of the KV
* @author LE GALL Benoît
* @since 1.0.0
*/
public record WatchResult(
String kvPath,
Map<String, Object> previous,
Expand Down
Loading
Loading