Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into improve-wait-logic-to-a-more-eleg…
Browse files Browse the repository at this point in the history
…ant-solution-open-feature#1160

# Conflicts:
#	providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java
#	providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java
#	providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java
  • Loading branch information
chrfwow committed Jan 21, 2025
2 parents 4402e66 + b4fe2f4 commit 99dda5c
Show file tree
Hide file tree
Showing 58 changed files with 1,130 additions and 1,762 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[submodule "providers/flagd/test-harness"]
path = providers/flagd/test-harness
url = https://github.com/open-feature/test-harness.git
branch = v0.5.21
branch = v1.1.1
[submodule "providers/flagd/spec"]
path = providers/flagd/spec
url = https://github.com/open-feature/spec.git
2 changes: 1 addition & 1 deletion .release-please-manifest.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"providers/flagd": "0.10.5",
"hooks/open-telemetry": "3.2.1",
"providers/go-feature-flag": "0.4.0",
"providers/go-feature-flag": "0.4.1",
"providers/flagsmith": "0.0.9",
"providers/env-var": "0.0.7",
"providers/jsonlogic-eval-provider": "1.1.1",
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.cucumber</groupId>
<artifactId>cucumber-picocontainer</artifactId>
<version>7.20.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions providers/flagd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,20 @@
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>toxiproxy</artifactId>
<version>1.20.4</version>
<scope>test</scope>
</dependency>
<!-- uncomment for logoutput during test runs -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package dev.openfeature.contrib.providers.flagd;

import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
import dev.openfeature.contrib.providers.flagd.resolver.common.Wait;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
Expand All @@ -13,12 +15,17 @@
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Metadata;
import dev.openfeature.sdk.ProviderEvaluation;
import dev.openfeature.sdk.ProviderEvent;
import dev.openfeature.sdk.ProviderEventDetails;
import dev.openfeature.sdk.Structure;
import dev.openfeature.sdk.Value;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -31,12 +38,32 @@ public class FlagdProvider extends EventProvider {
private Function<Structure, EvaluationContext> contextEnricher;
private static final String FLAGD_PROVIDER = "flagd";
private final Resolver flagResolver;
private volatile boolean initialized = false;
private volatile boolean connected = false;
private volatile Structure syncMetadata = new ImmutableStructure();
private volatile EvaluationContext enrichedContext = new ImmutableContext();
private final List<Hook> hooks = new ArrayList<>();
private final Wait connectionWait = new Wait();
private final EventsLock eventsLock = new EventsLock();

/**
* An executor service responsible for emitting
* {@link ProviderEvent#PROVIDER_ERROR} after the provider went
* {@link ProviderEvent#PROVIDER_STALE} for {@link #gracePeriod} seconds.
*/
private final ScheduledExecutorService errorExecutor;

/**
* A scheduled task for emitting {@link ProviderEvent#PROVIDER_ERROR}.
*/
private ScheduledFuture<?> errorTask;

/**
* The grace period in milliseconds to wait after
* {@link ProviderEvent#PROVIDER_STALE} before emitting a
* {@link ProviderEvent#PROVIDER_ERROR}.
*/
private final long gracePeriod;
/**
* The deadline in milliseconds for GRPC operations.
*/
private final long deadline;

protected final void finalize() {
// DO NOT REMOVE, spotbugs: CT_CONSTRUCTOR_THROW
Expand All @@ -57,18 +84,34 @@ public FlagdProvider() {
public FlagdProvider(final FlagdOptions options) {
switch (options.getResolverType().asString()) {
case Config.RESOLVER_IN_PROCESS:
this.flagResolver = new InProcessResolver(options, connectionWait, this::onConnectionEvent);
this.flagResolver = new InProcessResolver(options, this::onProviderEvent);
break;
case Config.RESOLVER_RPC:
this.flagResolver = new GrpcResolver(
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onConnectionEvent);
options, new Cache(options.getCacheType(), options.getMaxCacheSize()), this::onProviderEvent);
break;
default:
throw new IllegalStateException(
String.format("Requested unsupported resolver type of %s", options.getResolverType()));
}
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
contextEnricher = options.getContextEnricher();
errorExecutor = Executors.newSingleThreadScheduledExecutor();
gracePeriod = options.getRetryGracePeriod();
deadline = options.getDeadline();
}

/**
* Internal constructor for test cases.
* DO NOT MAKE PUBLIC
*/
FlagdProvider(Resolver resolver, boolean initialized) {
this.flagResolver = resolver;
deadline = Config.DEFAULT_DEADLINE;
gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD;
hooks.add(new SyncMetadataHook(this::getEnrichedContext));
errorExecutor = Executors.newSingleThreadScheduledExecutor();
this.eventsLock.initialized = initialized;
}

@Override
Expand All @@ -77,28 +120,40 @@ public List<Hook> getProviderHooks() {
}

@Override
public synchronized void initialize(EvaluationContext evaluationContext) throws Exception {
if (this.initialized) {
return;
}
public void initialize(EvaluationContext evaluationContext) throws Exception {
synchronized (eventsLock) {
if (eventsLock.initialized) {
return;
}

this.flagResolver.init();
this.initialized = this.connected = true;
flagResolver.init();
}
// block till ready - this works with deadline fine for rpc, but with in_process
// we also need to take parsing into the equation
// TODO: evaluate where we are losing time, so we can remove this magic number -
// follow up
// wait outside of the synchonrization or we'll deadlock
Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized);
connectionWait.onFinished();
}

@Override
public synchronized void shutdown() {
if (!this.initialized) {
return;
}

try {
this.flagResolver.shutdown();
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
this.initialized = false;
public void shutdown() {
synchronized (eventsLock) {
if (!eventsLock.initialized) {
return;
}
try {
this.flagResolver.shutdown();
if (errorExecutor != null) {
errorExecutor.shutdownNow();
errorExecutor.awaitTermination(deadline, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
log.error("Error during shutdown {}", FLAGD_PROVIDER, e);
} finally {
eventsLock.initialized = false;
}
}
}

Expand All @@ -109,27 +164,27 @@ public Metadata getMetadata() {

@Override
public ProviderEvaluation<Boolean> getBooleanEvaluation(String key, Boolean defaultValue, EvaluationContext ctx) {
return this.flagResolver.booleanEvaluation(key, defaultValue, ctx);
return flagResolver.booleanEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<String> getStringEvaluation(String key, String defaultValue, EvaluationContext ctx) {
return this.flagResolver.stringEvaluation(key, defaultValue, ctx);
return flagResolver.stringEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<Double> getDoubleEvaluation(String key, Double defaultValue, EvaluationContext ctx) {
return this.flagResolver.doubleEvaluation(key, defaultValue, ctx);
return flagResolver.doubleEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<Integer> getIntegerEvaluation(String key, Integer defaultValue, EvaluationContext ctx) {
return this.flagResolver.integerEvaluation(key, defaultValue, ctx);
return flagResolver.integerEvaluation(key, defaultValue, ctx);
}

@Override
public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultValue, EvaluationContext ctx) {
return this.flagResolver.objectEvaluation(key, defaultValue, ctx);
return flagResolver.objectEvaluation(key, defaultValue, ctx);
}

/**
Expand All @@ -142,7 +197,7 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
* @return Object map representing sync metadata
*/
protected Structure getSyncMetadata() {
return new ImmutableStructure(syncMetadata.asMap());
return new ImmutableStructure(eventsLock.syncMetadata.asMap());
}

/**
Expand All @@ -151,49 +206,110 @@ protected Structure getSyncMetadata() {
* @return context
*/
EvaluationContext getEnrichedContext() {
return enrichedContext;
return eventsLock.enrichedContext;
}

private void onConnectionEvent(ConnectionEvent connectionEvent) {
final boolean wasConnected = connected;
final boolean isConnected = connected = connectionEvent.isConnected();
if (isConnected) {
connectionWait.onFinished();
}
@SuppressWarnings("checkstyle:fallthrough")
private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) {

syncMetadata = connectionEvent.getSyncMetadata();
enrichedContext = contextEnricher.apply(connectionEvent.getSyncMetadata());
synchronized (eventsLock) {
log.info("FlagdProviderEvent: {}", flagdProviderEvent);
eventsLock.syncMetadata = flagdProviderEvent.getSyncMetadata();
if (flagdProviderEvent.getSyncMetadata() != null) {
eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata());
}

if (!initialized) {
return;
/*
* We only use Error and Ready as previous states.
* As error will first be emitted as Stale, and only turns after a while into an
* emitted Error.
* Ready is needed, as the InProcessResolver does not have a dedicated ready
* event, hence we need to
* forward a configuration changed to the ready, if we are not in the ready
* state.
*/
switch (flagdProviderEvent.getEvent()) {
case PROVIDER_CONFIGURATION_CHANGED:
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) {
onConfigurationChanged(flagdProviderEvent);
break;
}
// intentional fall through, a not-ready change will trigger a ready.
case PROVIDER_READY:
onReady();
eventsLock.previousEvent = ProviderEvent.PROVIDER_READY;
break;

case PROVIDER_ERROR:
if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) {
onError();
}
eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR;
break;
default:
log.info("Unknown event {}", flagdProviderEvent.getEvent());
}
}
}

private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) {
this.emitProviderConfigurationChanged(ProviderEventDetails.builder()
.flagsChanged(flagdProviderEvent.getFlagsChanged())
.message("configuration changed")
.build());
}

if (!wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("connected to flagd")
.build();
this.emitProviderReady(details);
return;
private void onReady() {
if (!eventsLock.initialized) {
eventsLock.initialized = true;
connectionWait.onFinished();
log.info("initialized FlagdProvider");
}
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
log.debug("Reconnection task cancelled as connection became READY.");
}
this.emitProviderReady(
ProviderEventDetails.builder().message("connected to flagd").build());
}

private void onError() {
log.info("Connection lost. Emit STALE event...");
log.debug("Waiting {}s for connection to become available...", gracePeriod);
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());

if (wasConnected && isConnected) {
ProviderEventDetails details = ProviderEventDetails.builder()
.flagsChanged(connectionEvent.getFlagsChanged())
.message("configuration changed")
.build();
this.emitProviderConfigurationChanged(details);
return;
if (errorTask != null && !errorTask.isCancelled()) {
errorTask.cancel(false);
}

if (connectionEvent.isStale()) {
this.emitProviderStale(ProviderEventDetails.builder()
.message("there has been an error")
.build());
} else {
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
if (!errorExecutor.isShutdown()) {
errorTask = errorExecutor.schedule(
() -> {
if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) {
log.debug(
"Provider did not reconnect successfully within {}s. Emit ERROR event...",
gracePeriod);
flagResolver.onError();
this.emitProviderError(ProviderEventDetails.builder()
.message("there has been an error")
.build());
}
},
gracePeriod,
TimeUnit.SECONDS);
}
}

/**
* Contains all fields we need to worry about locking, used as intrinsic lock
* for sync blocks.
*/
static class EventsLock {
volatile ProviderEvent previousEvent = null;
volatile Structure syncMetadata = new ImmutableStructure();
volatile boolean initialized = false;
volatile EvaluationContext enrichedContext = new ImmutableContext();
}
}
Loading

0 comments on commit 99dda5c

Please sign in to comment.