Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use blocking queries #5

Merged
merged 2 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@ The watcher can be configured using
consul:
watch:
disabled: false # to disable the watcher, during test for instance
initial-delay: 1ms # duration before running 1st poll
period: 30s # duration between each poll
```
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ dependencies {

compileOnly(mn.lombok)
compileOnly(mn.snakeyaml)
compileOnly(mn.micronaut.jackson.core)

implementation(mn.micronaut.jackson.databind)
implementation(mn.micronaut.serde.jackson)
implementation(mn.micronaut.discovery.client)
implementation(mn.micronaut.reactor)
implementation(mn.guava)
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
package com.frogdevelopment.micronaut.consul.watch;

import static io.micronaut.context.env.Environment.DEFAULT_NAME;
import static io.micronaut.discovery.config.ConfigDiscoveryConfiguration.DEFAULT_PATH;

import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.List;
import jakarta.inject.Named;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;
import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.Watcher;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertiesPropertySourceLoader;
import io.micronaut.context.env.PropertySourceLoader;
import io.micronaut.context.env.yaml.YamlPropertySourceLoader;
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.discovery.config.ConfigDiscoveryConfiguration.Format;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.client.v1.ConsulClient;
import io.micronaut.jackson.core.env.JsonPropertySourceLoader;
import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.List;

import static io.micronaut.context.env.Environment.DEFAULT_NAME;
import static io.micronaut.discovery.config.ConfigDiscoveryConfiguration.DEFAULT_PATH;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.TaskScheduler;

/**
* Create all needed {@link Watcher} based on configuration.
Expand All @@ -36,19 +41,27 @@ public class WatcherFactory {
private static final String CONSUL_PATH_SEPARATOR = "/";

private final Environment environment;
private final ConsulClient consulClient;
@Named(TaskExecutors.SCHEDULED)
private final TaskScheduler taskScheduler;
private final IndexConsulClient consulClient;
private final PropertiesChangeHandler propertiesChangeHandler;

@Bean
@Context
@Bean(preDestroy = "stop")
Watcher createWatcher(final ConsulConfiguration consulConfiguration) {
final var kvPaths = computeKvPaths(consulConfiguration);

final var format = consulConfiguration.getConfiguration().getFormat();
return switch (format) {
final var watcher = switch (format) {
case NATIVE -> watchNative(kvPaths);
case JSON, YAML, PROPERTIES -> watchConfigurations(kvPaths, resolveLoader(format));
case JSON -> watchConfigurations(kvPaths, new JsonPropertySourceLoader());
case YAML -> watchConfigurations(kvPaths, new YamlPropertySourceLoader());
case PROPERTIES -> watchConfigurations(kvPaths, new PropertiesPropertySourceLoader());
default -> throw new ConfigurationException("Unhandled configuration format: " + format);
};

watcher.start();
return watcher;
}

private List<String> computeKvPaths(final ConsulConfiguration consulConfiguration) {
Expand Down Expand Up @@ -91,22 +104,13 @@ private static String toProfiledPath(final String resource, final String activeN
}

private Watcher watchNative(final List<String> keyPaths) {
// adding '/' at the end of the kvPath to distinct 'kvPath/value' from 'kvPath,profile/value'
// adding '/' at the end of the kvPath to distinct 'kvPath/' from 'kvPath,profile/'
final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
return new NativeWatcher(kvPaths, consulClient, propertiesChangeHandler);
return new NativeWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler);
}

private Watcher watchConfigurations(final List<String> kvPaths, final PropertySourceLoader propertySourceLoader) {
return new ConfigurationsWatcher(kvPaths, consulClient, propertiesChangeHandler, propertySourceLoader);
}

private static PropertySourceLoader resolveLoader(final Format format) {
return switch (format) {
case JSON -> new JsonPropertySourceLoader();
case PROPERTIES -> new PropertiesPropertySourceLoader();
case YAML -> new YamlPropertySourceLoader();
default -> throw new ConfigurationException("Unsupported properties file format: " + format);
};
return new ConfigurationsWatcher(kvPaths, taskScheduler, consulClient, propertiesChangeHandler, propertySourceLoader);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import java.io.Closeable;
import java.util.List;

import org.reactivestreams.Publisher;

import io.micronaut.context.annotation.Requires;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.client.annotation.Client;

@Client(id = io.micronaut.discovery.consul.client.v1.ConsulClient.SERVICE_ID, path = "/v1",
configuration = ConsulConfiguration.class)
@Requires(beans = ConsulConfiguration.class)
public interface IndexConsulClient extends Closeable, AutoCloseable {

default Publisher<List<KeyValue>> readValues(String key) {
return readValues(key, true);
}

default Publisher<List<KeyValue>> readValues(String key, boolean recurse) {
return readValues(key, recurse, null);
}

@Get(uri = "/kv/{+key}?{&recurse}{&index}", single = true)
Publisher<List<KeyValue>> readValues(String key, @QueryValue boolean recurse, @Nullable @QueryValue Integer index);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;

import io.micronaut.core.annotation.ReflectiveAccess;
import io.micronaut.serde.annotation.Serdeable;

@Serdeable
@ReflectiveAccess
@JsonNaming(PropertyNamingStrategies.UpperCamelCaseStrategy.class)
public class KeyValue {

private final Integer modifyIndex;
private final String key;
private final String value;

/**
* @param key The key
* @param value The value
*/
@JsonCreator
public KeyValue(@JsonProperty("ModifyIndex") Integer modifyIndex, @JsonProperty("Key") String key, @JsonProperty("Value") String value) {
this.modifyIndex = modifyIndex;
this.key = key;
this.value = value;
}

/**
* @return The modifyIndex
*/
public Integer getModifyIndex() {
return modifyIndex;
}

/**
* @return The key
*/
public String getKey() {
return key;
}

/**
* @return The value
*/
public String getValue() {
return value;
}
}
Loading
Loading