Skip to content

Commit

Permalink
[consul] using Consul Blocking Queries
Browse files Browse the repository at this point in the history
  • Loading branch information
FrogDevelopper committed Dec 4, 2024
1 parent 3464eb6 commit ee574a1
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 371 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ dependencies {

compileOnly(mn.lombok)
compileOnly(mn.snakeyaml)
compileOnly(mn.micronaut.jackson.core)

implementation(mn.micronaut.jackson.databind)
implementation(mn.micronaut.serde.jackson)
implementation(mn.micronaut.discovery.client)
implementation(mn.micronaut.reactor)
implementation(mn.guava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@
import io.micronaut.context.event.ShutdownEvent;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.runtime.event.annotation.EventListener;
import io.micronaut.scheduling.TaskExecutors;
import io.micronaut.scheduling.TaskScheduler;

import jakarta.inject.Named;
import jakarta.inject.Singleton;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ScheduledFuture;
import jakarta.inject.Singleton;

/**
* Schedule the polling for all watcher in 1 unique Runnable.
Expand All @@ -25,37 +20,25 @@
@Singleton
final class WatchScheduler {

private final TaskScheduler taskScheduler;
private final WatchConfiguration watchConfiguration;
private final Watcher watcher;

private volatile boolean started = false;
private volatile boolean stopped = false;
private ScheduledFuture<?> scheduledFuture;

/**
* @param taskScheduler taskScheduler
* @param watchConfiguration configuration for the scheduled task
* @param watcher {@link Watcher} to schedule
*/
WatchScheduler(@Named(TaskExecutors.SCHEDULED) final TaskScheduler taskScheduler,
final WatchConfiguration watchConfiguration,
final Watcher watcher) {
this.taskScheduler = taskScheduler;
this.watchConfiguration = watchConfiguration;
WatchScheduler(final Watcher watcher) {
this.watcher = watcher;
}

/**
* Schedule the polling task
*/
@EventListener
public synchronized void start(final StartupEvent ignored) {
public void start(final StartupEvent ignored) {
if (!started) {
scheduledFuture = taskScheduler.scheduleWithFixedDelay(
watchConfiguration.getInitialDelay(),
watchConfiguration.getPeriod(),
watcher::watchKVs);
watcher.watchKVs();
started = true;
} else {
throw new IllegalStateException("Watcher scheduler already started");
Expand All @@ -67,7 +50,7 @@ public synchronized void start(final StartupEvent ignored) {
* Cancel the scheduled task
*/
@EventListener
public synchronized void stop(final ShutdownEvent ignored) {
public void stop(final ShutdownEvent ignored) {
if (!started) {
log.warn("You tried to stop an unstarted Watcher scheduler");
return;
Expand All @@ -77,7 +60,7 @@ public synchronized void stop(final ShutdownEvent ignored) {
return;
}

stopped = scheduledFuture.cancel(true);
stopped = true;
log.info("Stopped watch: {}", stopped);
if (Boolean.FALSE.equals(stopped)) {
log.warn("Watcher scheduler could not be stopped");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package com.frogdevelopment.micronaut.consul.watch;

import static io.micronaut.context.env.Environment.DEFAULT_NAME;
import static io.micronaut.discovery.config.ConfigDiscoveryConfiguration.DEFAULT_PATH;

import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.List;

import com.frogdevelopment.micronaut.consul.watch.client.IndexConsulClient;
import com.frogdevelopment.micronaut.consul.watch.context.PropertiesChangeHandler;
import com.frogdevelopment.micronaut.consul.watch.watcher.ConfigurationsWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.NativeWatcher;
import com.frogdevelopment.micronaut.consul.watch.watcher.Watcher;

import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.env.Environment;
Expand All @@ -13,15 +22,7 @@
import io.micronaut.context.exceptions.ConfigurationException;
import io.micronaut.discovery.config.ConfigDiscoveryConfiguration.Format;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.discovery.consul.client.v1.ConsulClient;
import io.micronaut.jackson.core.env.JsonPropertySourceLoader;
import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.List;

import static io.micronaut.context.env.Environment.DEFAULT_NAME;
import static io.micronaut.discovery.config.ConfigDiscoveryConfiguration.DEFAULT_PATH;

/**
* Create all needed {@link Watcher} based on configuration.
Expand All @@ -36,7 +37,7 @@ public class WatcherFactory {
private static final String CONSUL_PATH_SEPARATOR = "/";

private final Environment environment;
private final ConsulClient consulClient;
private final IndexConsulClient consulClient;
private final PropertiesChangeHandler propertiesChangeHandler;

@Bean
Expand All @@ -45,7 +46,7 @@ Watcher createWatcher(final ConsulConfiguration consulConfiguration) {

final var format = consulConfiguration.getConfiguration().getFormat();
return switch (format) {
case NATIVE -> watchNative(kvPaths);
// case NATIVE -> watchNative(kvPaths);
case JSON, YAML, PROPERTIES -> watchConfigurations(kvPaths, resolveLoader(format));
default -> throw new ConfigurationException("Unhandled configuration format: " + format);
};
Expand Down Expand Up @@ -90,11 +91,11 @@ private static String toProfiledPath(final String resource, final String activeN
return resource + "," + activeName;
}

private Watcher watchNative(final List<String> keyPaths) {
// adding '/' at the end of the kvPath to distinct 'kvPath/value' from 'kvPath,profile/value'
final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
return new NativeWatcher(kvPaths, consulClient, propertiesChangeHandler);
}
// private Watcher watchNative(final List<String> keyPaths) {
// // adding '/' at the end of the kvPath to distinct 'kvPath/value' from 'kvPath,profile/value'
// final var kvPaths = keyPaths.stream().map(path -> path + CONSUL_PATH_SEPARATOR).toList();
// return new NativeWatcher(kvPaths, consulClient, propertiesChangeHandler);
// }

private Watcher watchConfigurations(final List<String> kvPaths, final PropertySourceLoader propertySourceLoader) {
return new ConfigurationsWatcher(kvPaths, consulClient, propertiesChangeHandler, propertySourceLoader);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import java.io.Closeable;
import java.util.List;

import org.reactivestreams.Publisher;

import io.micronaut.context.annotation.Requires;
import io.micronaut.discovery.consul.ConsulConfiguration;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.annotation.QueryValue;
import io.micronaut.http.client.annotation.Client;

@Client(id = io.micronaut.discovery.consul.client.v1.ConsulClient.SERVICE_ID, path = "/v1",
configuration = ConsulConfiguration.class)
@Requires(beans = ConsulConfiguration.class)
public interface IndexConsulClient extends Closeable, AutoCloseable {

@Get(uri = "/kv/{+key}?{&index}", single = true)
Publisher<List<KeyValue>> watchValue(String key, @QueryValue int index);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.frogdevelopment.micronaut.consul.watch.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;

import io.micronaut.core.annotation.ReflectiveAccess;
import io.micronaut.serde.annotation.Serdeable;

@Serdeable
@ReflectiveAccess
@JsonNaming(PropertyNamingStrategies.UpperCamelCaseStrategy.class)
public record KeyValue(@JsonProperty("ModifyIndex") int modifyIndex,
@JsonProperty("Key") String key,
@JsonProperty("Value") String value) {
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package com.frogdevelopment.micronaut.consul.watch.context;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import jakarta.inject.Singleton;

import com.frogdevelopment.micronaut.consul.watch.watcher.WatchResult;
import com.google.common.collect.MapDifference.ValueDifference;
import com.google.common.collect.Maps;

import io.micronaut.context.env.Environment;
import io.micronaut.context.env.PropertySource;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.discovery.consul.client.v1.ConsulClient;
import io.micronaut.runtime.context.scope.refresh.RefreshEvent;
import jakarta.inject.Singleton;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* Handle properties' configuration changes to be notified into the Micronaut context.
Expand All @@ -33,41 +39,34 @@ public class PropertiesChangeHandler {

/**
* Update Micronaut context with new properties, then notify the changes.
* @param results Last Consul poll results
*
* @param result Last Consul poll results
*/
public void handleChanges(final List<WatchResult> results) {
if (results.isEmpty()) {
log.debug("Nothing to do");
return;
}

public void handleChanges(final WatchResult result) {
try {
// to accept null values, don't use the stream.collect()
final var allChanges = new HashMap<String, Object>();
final var newProperties = new HashMap<String, Map<String, Object>>();
for (final var result : results) {
final var difference = Maps.difference(result.previous(), result.next());
if (!difference.areEqual()) {
newProperties.put(toPropertySourceName(result), result.next());
// updated properties
final var differing = checkClassesTypeOnDifference(difference.entriesDiffering());
differing.forEach((key, value) -> allChanges.put(key, value.leftValue()));
// deleted properties
allChanges.putAll(difference.entriesOnlyOnLeft());
// added properties
difference.entriesOnlyOnRight().forEach((key, value) -> allChanges.put(key, null));
}
final var difference = Maps.difference(result.previous(), result.next());
if (!difference.areEqual()) {
final var differing = difference.entriesDiffering();
checkClassesTypeOnDifference(differing);

final var allChanges = new HashMap<String, Object>();
// updated properties
differing.forEach((key, value) -> allChanges.put(key, value.leftValue()));
// deleted properties
allChanges.putAll(difference.entriesOnlyOnLeft());
// added properties
difference.entriesOnlyOnRight().forEach((key, value) -> allChanges.put(key, null));

updatePropertySources(result);

publishDifferences(allChanges);
}

updatePropertySources(newProperties);

publishDifferences(allChanges);
} catch (final Exception e) {
log.error("Unable to apply configuration changes", e);
}
}

private Map<String, ValueDifference<Object>> checkClassesTypeOnDifference(Map<String, ValueDifference<Object>> differing) {
private void checkClassesTypeOnDifference(final Map<String, ValueDifference<Object>> differing) {
for (final var entry : differing.entrySet()) {
final var leftValue = entry.getValue().leftValue();
final var rightValue = entry.getValue().rightValue();
Expand All @@ -79,8 +78,6 @@ private Map<String, ValueDifference<Object>> checkClassesTypeOnDifference(Map<St
}
}
}

return differing;
}

private boolean areClassesTypeIncompatible(final Class<?> leftClass, final Class<?> rightClass) {
Expand All @@ -98,20 +95,15 @@ private static boolean isNotNumber(final Class<?> clazz) {
return !Number.class.isAssignableFrom(clazz);
}

private void updatePropertySources(final Map<String, Map<String, Object>> mapNewProperties) {
if (mapNewProperties.isEmpty()) {
return;
}

log.debug("Updating context with new configurations");
private void updatePropertySources(final WatchResult watchResult) {
final var propertySourceName = toPropertySourceName(watchResult);
log.debug("Updating context with new configurations for {}", propertySourceName);

final var updatedPropertySources = new ArrayList<PropertySource>();
for (final var propertySource : environment.getPropertySources()) {
final var propertySourceName = propertySource.getName();
if (mapNewProperties.containsKey(propertySourceName)) {
final var newProperties = mapNewProperties.get(propertySourceName);
if (propertySource.getName().equals(propertySourceName)) {
// creating a new PropertySource with new values but keeping the order
updatedPropertySources.add(PropertySource.of(propertySourceName, newProperties, propertySource.getOrder()));
updatedPropertySources.add(PropertySource.of(propertySourceName, watchResult.next(), propertySource.getOrder()));
} else {
updatedPropertySources.add(propertySource);
}
Expand Down
Loading

0 comments on commit ee574a1

Please sign in to comment.