Skip to content

Commit

Permalink
[consul] handle blocked query timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
FrogDevelopper committed Dec 10, 2024
1 parent d194683 commit 10a26e0
Show file tree
Hide file tree
Showing 10 changed files with 75 additions and 21 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ The watcher can be configured using
consul:
watch:
disabled: false # to disable the watcher, during test for instance
block-timeout: 10m # Sets the read timeout. Default value (10 minutes).
```
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation(mn.micronaut.serde.jackson)
implementation(mn.micronaut.discovery.client)
implementation(mn.micronaut.reactor)
implementation(mn.micronaut.retry)
implementation(mn.guava)

// ----------- TESTS -----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.Watcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.WatcherConfiguration;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Context;
Expand Down Expand Up @@ -44,6 +45,7 @@ public class WatcherFactory {
@Named(TaskExecutors.SCHEDULED)
private final TaskScheduler taskScheduler;
private final IndexConsulClient consulClient;
private final WatcherConfiguration watcherConfiguration;
private final PropertiesChangeHandler propertiesChangeHandler;

@Context
Expand Down Expand Up @@ -106,11 +108,11 @@ private static String toProfiledPath(final String resource, final String activeN
private Watcher watchNative(final List<String> keyPaths) {
// adding '/' at the end of the kvPath to distinct 'kvPath/' from 'kvPath,profile/'
final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
return new NativeWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
return new NativeWatcher(kvPaths, taskScheduler, consulClient, watcherConfiguration, propertiesChangeHandler);
}

private Watcher watchConfigurations(final List<String> kvPaths, final PropertySourceLoader propertySourceLoader) {
return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler, propertySourceLoader);
return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, watcherConfiguration, propertiesChangeHandler, propertySourceLoader);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,18 @@
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.retry.annotation.Retryable;

@Client(id = io.micronaut.discovery.consul.client.v1.ConsulClient.SERVICE_ID, path = "/v1",
configuration = ConsulConfiguration.class)
@Requires(beans = ConsulConfiguration.class)
public interface IndexConsulClient extends Closeable, AutoCloseable {

default Publisher<List<KeyValue>> readValues(String key) {
return readValues(key, true);
}

default Publisher<List<KeyValue>> readValues(String key, boolean recurse) {
return readValues(key, recurse, null);
}

@Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true)
Publisher<List<KeyValue>> readValues(String key, @QueryValue boolean recurse, @Nullable @QueryValue Integer index);
@Retryable(
attempts = "${" + ConsulConfiguration.ConsulConfigDiscoveryConfiguration.PREFIX + ".retry-count:3}",
delay = "${" + ConsulConfiguration.ConsulConfigDiscoveryConfiguration.PREFIX + ".retry-delay:1s}"
)
Publisher<List<KeyValue>> watchValues(String key, @Nullable @QueryValue Boolean recurse, @Nullable @QueryValue Integer index);

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
@Configuration
@RequiresConsul
@Requires(property = "consul.watch.disabled", notEquals = "true", defaultValue = "false")
@Requires(property = WatcherConfiguration.PREFIX + "disabled", notEquals = "true", defaultValue = "false")
package com.frogdevelopment.micronaut.consul.watch;

import com.frogdevelopment.micronaut.consul.watch.watcher.WatcherConfiguration;

import io.micronaut.context.annotation.Configuration;
import io.micronaut.context.annotation.Requires;
import io.micronaut.discovery.consul.condition.RequiresConsul;
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import io.micronaut.http.HttpStatus;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.http.client.exceptions.ReadTimeoutException;
import io.micronaut.http.client.exceptions.ResponseClosedException;
import io.micronaut.scheduling.TaskScheduler;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
Expand All @@ -35,6 +34,7 @@ abstract sealed class AbstractWatcher<V> implements Watcher permits Configuratio
private final List<String> kvPaths;
private final TaskScheduler taskScheduler;
protected final IndexConsulClient consulClient;
private final WatcherConfiguration configuration;
private final PropertiesChangeHandler propertiesChangeHandler;

protected final Map<String, V> kvHolder = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -87,6 +87,7 @@ private void watchKvPath(final String kvPath, final int nbFailures) {
return;
}
final var disposable = watchValue(kvPath)
.timeout(configuration.getBlockTimeout())
.subscribe(next -> onNext(kvPath, next), throwable -> onError(kvPath, throwable, nbFailures));

listeners.put(kvPath, disposable);
Expand Down Expand Up @@ -118,9 +119,9 @@ private void onNext(String kvPath, final V next) {

private void onError(String kvPath, Throwable throwable, int nbFailures) {
if (throwable instanceof final HttpClientResponseException e && e.getStatus() == HttpStatus.NOT_FOUND) {
log.debug("No KV found with kvPath={}", kvPath);
log.trace("No KV found with kvPath={}", kvPath);
listeners.remove(kvPath);
} else if (throwable instanceof ReadTimeoutException || throwable instanceof ResponseClosedException) {
} else if (throwable instanceof ReadTimeoutException) {
log.debug("Exception [{}] for kvPath={}", throwable, kvPath);
watchKvPath(kvPath, 0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public final class ConfigurationsWatcher extends AbstractWatcher<KeyValue> {
public ConfigurationsWatcher(final List<String> kvPaths,
final TaskScheduler taskScheduler,
final IndexConsulClient consulClient,
final WatcherConfiguration configuration,
final PropertiesChangeHandler propertiesChangeHandler,
final PropertySourceReader propertySourceReader) {
super(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
super(kvPaths, taskScheduler, consulClient, configuration, propertiesChangeHandler);
this.propertySourceReader = propertySourceReader;
}

Expand All @@ -48,7 +49,7 @@ protected Mono<KeyValue> watchValue(String kvPath) {
.map(KeyValue::getModifyIndex)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, false, modifiedIndex))
return Mono.from(consulClient.watchValues(kvPath, false, modifiedIndex))
.flatMapMany(Flux::fromIterable)
.filter(kv -> kvPath.equals(kv.getKey()))
.singleOrEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public final class NativeWatcher extends AbstractWatcher<List<KeyValue>> {
public NativeWatcher(final List<String> kvPaths,
final TaskScheduler taskScheduler,
final IndexConsulClient consulClient,
final WatcherConfiguration configuration,
final PropertiesChangeHandler propertiesChangeHandler) {
super(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
super(kvPaths, taskScheduler, consulClient, configuration, propertiesChangeHandler);
}

@Override
Expand All @@ -50,7 +51,7 @@ protected Mono<List<KeyValue>> watchValue(String kvPath) {
.max(Integer::compareTo)
.orElse(NO_INDEX);
log.debug("Watching kvPath={} with index={}", kvPath, modifiedIndex);
return Mono.from(consulClient.readValues(kvPath, true, modifiedIndex));
return Mono.from(consulClient.watchValues(kvPath, true, modifiedIndex));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.frogdevelopment.micronaut.consul.watch.watcher;

import java.time.Duration;

import io.micronaut.context.annotation.ConfigurationProperties;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.condition.RequiresConsul;

/**
* Configuration for Consul Watcher
*
* @author LE GALL Benoît
* @since 1.0.0
*/
@RequiresConsul
@ConfigurationProperties(WatcherConfiguration.PREFIX)
public class WatcherConfiguration {

/**
* The default block timeout in minutes.
*/
@SuppressWarnings("WeakerAccess")
public static final long DEFAULT_BLOCK_TIMEOUT_MINUTES = 10;

/**
* The prefix to use for all Consul settings.
*/
public static final String PREFIX = "consul.watch";

private Duration blockTimeout = Duration.ofMinutes(DEFAULT_BLOCK_TIMEOUT_MINUTES);

/**
* Sets the block timeout. Default value ({@value #DEFAULT_BLOCK_TIMEOUT_MINUTES} minutes).
*
* @param blockTimeout The block timeout
*/
public void setBlockTimeout(@Nullable Duration blockTimeout) {
this.blockTimeout = blockTimeout;
}

/**
* @return The block timeout.
*/
public Duration getBlockTimeout() {
return blockTimeout;
}
}
5 changes: 3 additions & 2 deletions src/test/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
consul:
client:
read-idle-timeout: 15s
read-timeout: 30s
read-timeout: 2s
watch:
block-timeout: 5s

0 comments on commit 10a26e0

Please sign in to comment.