Skip to content

Commit

Permalink
FalgSets improvements and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nmayorsplit committed Oct 25, 2023
1 parent 541db29 commit 2df0f97
Show file tree
Hide file tree
Showing 23 changed files with 136 additions and 87 deletions.
13 changes: 9 additions & 4 deletions client/src/main/java/io/split/client/SplitClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import io.split.client.events.EventsStorageProducer;
import io.split.client.impressions.Impression;
import io.split.client.impressions.ImpressionsManager;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.engine.SDKReadinessGates;
import io.split.engine.evaluator.Evaluator;
import io.split.engine.evaluator.EvaluatorImp;
Expand Down Expand Up @@ -332,13 +334,16 @@ private Map<String, SplitResult> getTreatmentsWithConfigInternal(String matching
_log.warn("The sets are not in flagSetsFilter config");
return new HashMap<>();
}
featureFlagNames = getAllFlags(cleanFlagSets);
featureFlagNames = new ArrayList<>();
} else if (featureFlagNames == null) {
_log.error(String.format("%s: featureFlagNames must be a non-empty array", methodEnum.getMethod()));
return new HashMap<>();
}
try {
checkSDKReady(methodEnum, featureFlagNames);
if (cleanFlagSets != null) {
featureFlagNames = getAllFlags(cleanFlagSets);
}
if (_container.isDestroyed()) {
_log.error("Client has already been destroyed - no calls possible");
return createMapControl(featureFlagNames);
Expand All @@ -354,7 +359,7 @@ private Map<String, SplitResult> getTreatmentsWithConfigInternal(String matching
}
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluatorResult;
if (cleanFlagSets != null) {
evaluatorResult = _evaluator.evaluateFeaturesByFlagSets(matchingKey, bucketingKey, new ArrayList<>(cleanFlagSets));
evaluatorResult = _evaluator.evaluateFeaturesByFlagSets(matchingKey, bucketingKey, new ArrayList<>(cleanFlagSets), attributes);
} else {
featureFlagNames = SplitNameValidator.areValid(featureFlagNames, methodEnum.getMethod());
evaluatorResult = _evaluator.evaluateFeatures(matchingKey, bucketingKey, featureFlagNames, attributes);
Expand Down Expand Up @@ -391,10 +396,10 @@ private Map<String, SplitResult> getTreatmentsWithConfigInternal(String matching
}

private List<String> filterSetsAreInConfig(Set<String> sets) {
HashSet<String> configSets = _config.getSetsFilter();
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(_config.getSetsFilter());
List<String> setsToReturn = new ArrayList<>();
for (String set : sets) {
if (!configSets.contains(set)) {
if (!flagSetsFilter.Intersect(set)) {
_log.warn(String.format("GetTreatmentsByFlagSets: you passed %s which is not part of the configured FlagSetsFilter, " +
"ignoring Flag Set.", set));
continue;
Expand Down
17 changes: 9 additions & 8 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.split.client.impressions.strategy.ProcessImpressionStrategy;
import io.split.client.interceptors.AuthorizationInterceptorFilter;
import io.split.client.interceptors.ClientKeyInterceptorFilter;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
Expand Down Expand Up @@ -110,7 +111,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -190,7 +190,8 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

// Cache Initialisations
SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
SplitCache splitCache = new InMemoryCacheImp(config.getSetsFilter());
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(config.getSetsFilter());
SplitCache splitCache = new InMemoryCacheImp(flagSetsFilter);
ImpressionsStorage impressionsStorage = new InMemoryImpressionsStorage(config.impressionsQueueSize());
_splitCache = splitCache;
_segmentCache = segmentCache;
Expand All @@ -202,7 +203,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn

SplitParser splitParser = new SplitParser();
// SplitFetcher
_splitFetcher = buildSplitFetcher(splitCache, splitParser, config.getSetsFilter());
_splitFetcher = buildSplitFetcher(splitCache, splitParser, flagSetsFilter);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
Expand Down Expand Up @@ -355,7 +356,8 @@ protected SplitFactoryImpl(SplitClientConfig config) {
_telemetryStorageProducer = new NoopTelemetryStorage();

SegmentCache segmentCache = new SegmentCacheInMemoryImpl();
SplitCache splitCache = new InMemoryCacheImp(config.getSetsFilter());
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(config.getSetsFilter());
SplitCache splitCache = new InMemoryCacheImp(flagSetsFilter);
_splitCache = splitCache;
_gates = new SDKReadinessGates();
_segmentCache = segmentCache;
Expand All @@ -379,7 +381,7 @@ protected SplitFactoryImpl(SplitClientConfig config) {
SplitChangeFetcher splitChangeFetcher = createSplitChangeFetcher(config);
SplitParser splitParser = new SplitParser();

_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer, config.getSetsFilter());
_splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCache, _telemetryStorageProducer, flagSetsFilter);

// SplitSynchronizationTask
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, splitCache, config.featuresRefreshRate(), config.getThreadFactory());
Expand Down Expand Up @@ -561,11 +563,10 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config, Se
config.getThreadFactory());
}

private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser, HashSet<String> flagSets) throws
private SplitFetcher buildSplitFetcher(SplitCacheProducer splitCacheProducer, SplitParser splitParser, FlagSetsFilter flagSetsFilter) throws
URISyntaxException {
SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher.create(_httpclient, _rootTarget, _telemetryStorageProducer);

return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, _telemetryStorageProducer,flagSets);
return new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, _telemetryStorageProducer,flagSetsFilter);
}

private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config, ImpressionsStorageConsumer impressionsStorageConsumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
public class FeatureFlagProcessor {
private static final Logger _log = LoggerFactory.getLogger(FeatureFlagProcessor.class);

public static FeatureFlagsToUpdate processFeatureFlagChanges(SplitParser splitParser, List<Split> splits, HashSet<String> configSets) {
public static FeatureFlagsToUpdate processFeatureFlagChanges(SplitParser splitParser, List<Split> splits, FlagSetsFilter flagSetsFilter) {
List<ParsedSplit> toAdd = new ArrayList<>();
List<String> toRemove = new ArrayList<>();
Set<String> segments = new HashSet<>();
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(configSets);
for (Split split : splits) {
if (split.status != Status.ACTIVE) {
// archive.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.common;

import com.google.common.annotations.VisibleForTesting;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.engine.experiments.SplitParser;
import io.split.engine.sse.AuthApiClient;
import io.split.engine.sse.AuthApiClientImp;
Expand All @@ -23,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -76,9 +76,9 @@ public static PushManagerImp build(Synchronizer synchronizer,
ThreadFactory threadFactory,
SplitParser splitParser,
SplitCacheProducer splitCacheProducer,
HashSet<String> flagSets) {
FlagSetsFilter flagSetsFilter) {
FeatureFlagsWorker featureFlagsWorker = new FeatureFlagWorkerImp(synchronizer, splitParser, splitCacheProducer,
telemetryRuntimeProducer, flagSets);
telemetryRuntimeProducer, flagSetsFilter);
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages, telemetryRuntimeProducer);
return new PushManagerImp(new AuthApiClientImp(authUrl, splitAPI.getHttpClient(), telemetryRuntimeProducer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.annotations.VisibleForTesting;
import io.split.client.ApiKeyCounter;
import io.split.client.SplitClientConfig;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.engine.SDKReadinessGates;
import io.split.engine.experiments.SplitFetcher;
import io.split.engine.experiments.SplitParser;
Expand Down Expand Up @@ -108,7 +109,7 @@ public static SyncManagerImp build(SplitTasks splitTasks,
config.getThreadFactory(),
splitParser,
splitCacheProducer,
config.getSetsFilter());
new FlagSetsFilterImpl(config.getSetsFilter()));

return new SyncManagerImp(splitTasks,
config.streamingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ EvaluatorImp.TreatmentLabelAndChangeNumber evaluateFeature(String matchingKey, S
Map<String, Object> attributes);
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeatures(String matchingKey, String bucketingKey,
List<String> featureFlags, Map<String, Object> attributes);
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeaturesByFlagSets(String key, String bucketingKey, List<String> flagSets);
Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeaturesByFlagSets(String key, String bucketingKey,
List<String> flagSets, Map<String, Object> attributes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public Map<String, TreatmentLabelAndChangeNumber> evaluateFeatures(String matchi

@Override
public Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluateFeaturesByFlagSets(String key, String bucketingKey,
List<String> flagSets) {
List<String> flagSets, Map<String, Object> attributes) {
List<String> flagSetsWithNames = getFeatureFlagNamesByFlagSets(flagSets);
Map<String, TreatmentLabelAndChangeNumber> evaluations = evaluateFeatures(key, bucketingKey, flagSetsWithNames, null);
Map<String, TreatmentLabelAndChangeNumber> evaluations = evaluateFeatures(key, bucketingKey, flagSetsWithNames, attributes);
return evaluations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.dtos.SplitChange;
import io.split.client.exceptions.UriTooLongException;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.utils.FeatureFlagsToUpdate;
import io.split.storages.SplitCacheProducer;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
Expand Down Expand Up @@ -30,7 +31,7 @@ public class SplitFetcherImp implements SplitFetcher {
private final SplitCacheProducer _splitCacheProducer;
private final Object _lock = new Object();
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final HashSet<String> _flagSets;
private final FlagSetsFilter _flagSetsFilter;

/**
* Contains all the traffic types that are currently being used by the splits and also the count
Expand All @@ -43,12 +44,12 @@ public class SplitFetcherImp implements SplitFetcher {
*/

public SplitFetcherImp(SplitChangeFetcher splitChangeFetcher, SplitParser parser, SplitCacheProducer splitCacheProducer,
TelemetryRuntimeProducer telemetryRuntimeProducer, HashSet<String> sets) {
TelemetryRuntimeProducer telemetryRuntimeProducer, FlagSetsFilter flagSetsFilter) {
_splitChangeFetcher = checkNotNull(splitChangeFetcher);
_parser = checkNotNull(parser);
_splitCacheProducer = checkNotNull(splitCacheProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_flagSets = sets;
_flagSetsFilter = flagSetsFilter;
}

@Override
Expand Down Expand Up @@ -119,7 +120,7 @@ private Set<String> runWithoutExceptionHandling(FetchOptions options) throws Int
// some other thread may have updated the shared state. exit
return segments;
}
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_parser, change.splits, _flagSets);
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_parser, change.splits, _flagSetsFilter);
segments = featureFlagsToUpdate.getSegments();
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(), change.till);
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SPLITS, System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.split.engine.sse.workers;

import io.split.client.dtos.Split;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.utils.FeatureFlagsToUpdate;
import io.split.engine.common.Synchronizer;
import io.split.engine.experiments.SplitParser;
Expand All @@ -13,7 +14,6 @@
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -25,16 +25,16 @@ public class FeatureFlagWorkerImp extends Worker<FeatureFlagChangeNotification>
private final SplitParser _splitParser;
private final SplitCacheProducer _splitCacheProducer;
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
private final HashSet<String> _flagSets;
private final FlagSetsFilter _flagSetsFilter;

public FeatureFlagWorkerImp(Synchronizer synchronizer, SplitParser splitParser, SplitCacheProducer splitCacheProducer,
TelemetryRuntimeProducer telemetryRuntimeProducer, HashSet<String> flagSets) {
TelemetryRuntimeProducer telemetryRuntimeProducer, FlagSetsFilter flagSetsFilter) {
super("Feature flags");
_synchronizer = checkNotNull(synchronizer);
_splitParser = splitParser;
_splitCacheProducer = splitCacheProducer;
_telemetryRuntimeProducer = telemetryRuntimeProducer;
_flagSets = flagSets;
_flagSetsFilter = flagSetsFilter;
}

@Override
Expand Down Expand Up @@ -65,7 +65,7 @@ private boolean addOrUpdateFeatureFlag(FeatureFlagChangeNotification featureFlag
featureFlagChangeNotification.getPreviousChangeNumber() == _splitCacheProducer.getChangeNumber()) {
Split featureFlag = featureFlagChangeNotification.getFeatureFlagDefinition();
FeatureFlagsToUpdate featureFlagsToUpdate = processFeatureFlagChanges(_splitParser, Collections.singletonList(featureFlag),
_flagSets);
_flagSetsFilter);
_splitCacheProducer.update(featureFlagsToUpdate.getToAdd(), featureFlagsToUpdate.getToRemove(),
featureFlagChangeNotification.getChangeNumber());
Set<String> segments = featureFlagsToUpdate.getSegments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.engine.experiments.ParsedSplit;
import io.split.storages.SplitCache;
import org.slf4j.Logger;
Expand Down Expand Up @@ -33,16 +32,16 @@ public class InMemoryCacheImp implements SplitCache {

private AtomicLong _changeNumber;

public InMemoryCacheImp(HashSet<String> flagSets) {
public InMemoryCacheImp(FlagSetsFilter flagSets) {
this(-1, flagSets);
}

public InMemoryCacheImp(long startingChangeNumber, HashSet<String> flagSets) {
public InMemoryCacheImp(long startingChangeNumber, FlagSetsFilter flagSets) {
_concurrentMap = Maps.newConcurrentMap();
_changeNumber = new AtomicLong(startingChangeNumber);
_concurrentTrafficTypeNameSet = ConcurrentHashMultiset.create();
_flagSets = Maps.newConcurrentMap();
_flagSetsFilter = new FlagSetsFilterImpl(flagSets);
_flagSetsFilter = flagSets;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.client;

import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.SplitCache;
import org.junit.Assert;
Expand All @@ -18,7 +20,8 @@ public class CacheUpdaterServiceTest {

@Test
public void testCacheUpdate() {
SplitCache splitCache = new InMemoryCacheImp(new HashSet<>());
FlagSetsFilter flagSetsFilter = new FlagSetsFilterImpl(new HashSet<>());
SplitCache splitCache = new InMemoryCacheImp(flagSetsFilter);
CacheUpdaterService cacheUpdaterService = new CacheUpdaterService(splitCache);
cacheUpdaterService.updateCache(getMap());
Assert.assertNotNull(splitCache.get(MY_FEATURE));
Expand Down
Loading

0 comments on commit 2df0f97

Please sign in to comment.