Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impression toggle #523

Merged
merged 10 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
4.14.0 (Dec X, 2024)
- Added support for Impression Toggle in feature flags

4.13.1 (Dec 5, 2024)
- Updated `org.apache.httpcomponents.client5` dependency to 5.4.1 to fix vulnerabilities.
- Updated `redis.clients` dependency to 4.4.8 to fix vulnerabilities.
Expand Down
4 changes: 2 additions & 2 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.13.1</version>
<version>4.14.0-rc1</version>
</parent>
<version>4.13.1</version>
<version>4.14.0-rc1</version>
<artifactId>java-client</artifactId>
<packaging>jar</packaging>
<name>Java Client</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void updateCache(Map<SplitAndKey, LocalhostSplit> map) {
String treatment = conditions.size() > 0 ? Treatments.CONTROL : localhostSplit.treatment;
configurations.put(localhostSplit.treatment, localhostSplit.config);

split = new ParsedSplit(splitName, 0, false, treatment,conditions, LOCALHOST, 0, 100, 0, 0, configurations, new HashSet<>());
split = new ParsedSplit(splitName, 0, false, treatment,conditions, LOCALHOST, 0, 100, 0, 0, configurations, new HashSet<>(), true);
parsedSplits.removeIf(parsedSplit -> parsedSplit.feature().equals(splitName));
parsedSplits.add(split);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public SplitChange fetch(long since, FetchOptions options) {
String.format("Could not retrieve splitChanges since %s; http return code %s", since, response.statusCode())
);
}

return Json.fromJson(response.body(), SplitChange.class);
} catch (Exception e) {
throw new IllegalStateException(String.format("Problem fetching splitChanges since %s: %s", since, e), e);
Expand Down
26 changes: 17 additions & 9 deletions client/src/main/java/io/split/client/SplitClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.api.Key;
import io.split.client.api.SplitResult;
import io.split.client.dtos.DecoratedImpression;
import io.split.client.dtos.Event;
import io.split.client.events.EventsStorageProducer;
import io.split.client.impressions.Impression;
Expand Down Expand Up @@ -356,7 +357,8 @@ private SplitResult getTreatmentWithConfigInternal(String matchingKey, String bu
String.format("sdk.%s", methodEnum.getMethod()),
_config.labelsEnabled() ? result.label : null,
result.changeNumber,
attributes
attributes,
result.track
);
_telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis() - initTime);
return new SplitResult(result.treatment, result.configurations);
Expand Down Expand Up @@ -435,7 +437,7 @@ private Map<String, SplitResult> getTreatmentsBySetsWithConfigInternal(String ma
private Map<String, SplitResult> processEvaluatorResult(Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluatorResult,
MethodEnum methodEnum, String matchingKey, String bucketingKey, Map<String,
Object> attributes, long initTime){
List<Impression> impressions = new ArrayList<>();
List<DecoratedImpression> decoratedImpressions = new ArrayList<>();
Map<String, SplitResult> result = new HashMap<>();
evaluatorResult.keySet().forEach(t -> {
if (evaluatorResult.get(t).treatment.equals(Treatments.CONTROL) && evaluatorResult.get(t).label.
Expand All @@ -445,13 +447,16 @@ private Map<String, SplitResult> processEvaluatorResult(Map<String, EvaluatorImp
result.put(t, SPLIT_RESULT_CONTROL);
} else {
result.put(t, new SplitResult(evaluatorResult.get(t).treatment, evaluatorResult.get(t).configurations));
impressions.add(new Impression(matchingKey, bucketingKey, t, evaluatorResult.get(t).treatment, System.currentTimeMillis(),
evaluatorResult.get(t).label, evaluatorResult.get(t).changeNumber, attributes));
decoratedImpressions.add(
new DecoratedImpression(
new Impression(matchingKey, bucketingKey, t, evaluatorResult.get(t).treatment, System.currentTimeMillis(),
evaluatorResult.get(t).label, evaluatorResult.get(t).changeNumber, attributes),
evaluatorResult.get(t).track));
}
});
_telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis() - initTime);
if (impressions.size() > 0) {
_impressionManager.track(impressions);
if (!decoratedImpressions.isEmpty()) {
_impressionManager.track(decoratedImpressions);
}
return result;
}
Expand Down Expand Up @@ -501,10 +506,13 @@ private Set<String> filterSetsAreInConfig(Set<String> sets, MethodEnum methodEnu
return setsToReturn;
}
private void recordStats(String matchingKey, String bucketingKey, String featureFlagName, long start, String result,
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
String operation, String label, Long changeNumber, Map<String, Object> attributes, boolean track) {
try {
_impressionManager.track(Stream.of(new Impression(matchingKey, bucketingKey, featureFlagName, result, System.currentTimeMillis(),
label, changeNumber, attributes)).collect(Collectors.toList()));
_impressionManager.track(Stream.of(
new DecoratedImpression(
new Impression(matchingKey, bucketingKey, featureFlagName, result, System.currentTimeMillis(),
label, changeNumber, attributes),
track)).collect(Collectors.toList()));
} catch (Throwable t) {
_log.error("Exception", t);
}
Expand Down
25 changes: 11 additions & 14 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -630,13 +630,14 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config,
.collect(Collectors.toCollection(() -> impressionListeners));
}
ProcessImpressionStrategy processImpressionStrategy = null;
ImpressionCounter counter = null;
ImpressionCounter counter = new ImpressionCounter();
ImpressionListener listener = !impressionListeners.isEmpty()
? new ImpressionListener.FederatedImpressionListener(impressionListeners)
: null;
ProcessImpressionNone processImpressionNone = new ProcessImpressionNone(listener != null, _uniqueKeysTracker, counter);

switch (config.impressionsMode()) {
case OPTIMIZED:
counter = new ImpressionCounter();
ImpressionObserver impressionObserver = new ImpressionObserver(config.getLastSeenCacheSize());
processImpressionStrategy = new ProcessImpressionOptimized(listener != null, impressionObserver,
counter, _telemetryStorageProducer);
Expand All @@ -646,13 +647,12 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config,
processImpressionStrategy = new ProcessImpressionDebug(listener != null, impressionObserver);
break;
case NONE:
counter = new ImpressionCounter();
processImpressionStrategy = new ProcessImpressionNone(listener != null, _uniqueKeysTracker, counter);
processImpressionStrategy = processImpressionNone;
break;
}
return ImpressionsManagerImpl.instance(config, _telemetryStorageProducer, impressionsStorageConsumer,
impressionsStorageProducer,
_impressionsSender, processImpressionStrategy, counter, listener);
_impressionsSender, processImpressionNone, processImpressionStrategy, counter, listener);
}

private SDKMetadata createSdkMetadata(boolean ipAddressEnabled, String splitSdkVersion) {
Expand Down Expand Up @@ -690,15 +690,12 @@ private void manageSdkReady(SplitClientConfig config) {
}

private UniqueKeysTracker createUniqueKeysTracker(SplitClientConfig config) {
if (config.impressionsMode().equals(ImpressionsManager.Mode.NONE)) {
int uniqueKeysRefreshRate = config.operationMode().equals(OperationMode.STANDALONE)
? config.uniqueKeysRefreshRateInMemory()
: config.uniqueKeysRefreshRateRedis();
return new UniqueKeysTrackerImp(_telemetrySynchronizer, uniqueKeysRefreshRate,
config.filterUniqueKeysRefreshRate(),
config.getThreadFactory());
}
return null;
int uniqueKeysRefreshRate = config.operationMode().equals(OperationMode.STANDALONE)
? config.uniqueKeysRefreshRateInMemory()
: config.uniqueKeysRefreshRateRedis();
return new UniqueKeysTrackerImp(_telemetrySynchronizer, uniqueKeysRefreshRate,
config.filterUniqueKeysRefreshRate(),
config.getThreadFactory());
}

private SplitChangeFetcher createSplitChangeFetcher(SplitClientConfig splitClientConfig) {
Expand Down
2 changes: 2 additions & 0 deletions client/src/main/java/io/split/client/api/SplitView.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class SplitView {
public Map<String, String> configs;
public List<String> sets;
public String defaultTreatment;
public boolean trackImpression;

public static SplitView fromParsedSplit(ParsedSplit parsedSplit) {
SplitView splitView = new SplitView();
Expand All @@ -46,6 +47,7 @@ public static SplitView fromParsedSplit(ParsedSplit parsedSplit) {

splitView.treatments = new ArrayList<String>(treatments);
splitView.configs = parsedSplit.configurations() == null? Collections.<String, String>emptyMap() : parsedSplit.configurations() ;
splitView.trackImpression = parsedSplit.trackImpression();

return splitView;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.split.client.dtos;

import io.split.client.impressions.Impression;

public class DecoratedImpression {
private Impression impression;
private boolean track;

public DecoratedImpression(Impression impression, boolean track) {
this.impression = impression;
this.track = track;
}

public Impression impression() { return this.impression;}

public boolean track() { return this.track;}
}

1 change: 1 addition & 0 deletions client/src/main/java/io/split/client/dtos/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class Split {
public int algo;
public Map<String, String> configurations;
public HashSet<String> sets;
public Boolean trackImpression = null;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,6 @@ public void postImpressionsBulk(List<TestImpressions> impressions) {
@Override
public void postCounters(HashMap<ImpressionCounter.Key, Integer> raw) {
long initTime = System.currentTimeMillis();
if (_mode.equals(ImpressionsManager.Mode.DEBUG)) {
_logger.warn("Attempted to submit counters in impressions debugging mode. Ignoring");
return;
}

try {

Map<String, List<String>> additionalHeaders = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.client.impressions;

import io.split.client.dtos.DecoratedImpression;

import java.util.List;

public interface ImpressionsManager {
Expand All @@ -10,14 +12,14 @@ public enum Mode {
NONE
}

void track(List<Impression> impressions);
void track(List<DecoratedImpression> decoratedImpressions);
void start();
void close();

final class NoOpImpressionsManager implements ImpressionsManager {

@Override
public void track(List<Impression> impressions) { /* do nothing */ }
public void track(List<DecoratedImpression> decoratedImpressions) { /* do nothing */ }

@Override
public void start(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.SplitClientConfig;
import io.split.client.dtos.DecoratedImpression;
import io.split.client.dtos.KeyImpression;
import io.split.client.dtos.TestImpressions;
import io.split.client.impressions.strategy.ProcessImpressionNone;
import io.split.client.impressions.strategy.ProcessImpressionStrategy;
import io.split.client.utils.SplitExecutorFactory;
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
Expand All @@ -13,10 +15,13 @@

import java.io.Closeable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -40,37 +45,42 @@ public class ImpressionsManagerImpl implements ImpressionsManager, Closeable {
private TelemetryRuntimeProducer _telemetryRuntimeProducer;
private ImpressionCounter _counter;
private ProcessImpressionStrategy _processImpressionStrategy;
private ProcessImpressionNone _processImpressionNone;

private final int _impressionsRefreshRate;

public static ImpressionsManagerImpl instance(SplitClientConfig config,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ImpressionsSender impressionsSender,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter counter,
ImpressionListener listener) throws URISyntaxException {
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
impressionsStorageProducer, processImpressionStrategy, counter, listener);
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
}

public static ImpressionsManagerImpl instanceForTest(SplitClientConfig config,
ImpressionsSender impressionsSender,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter counter,
ImpressionListener listener) {
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
impressionsStorageProducer, processImpressionStrategy, counter, listener);
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
}

private ImpressionsManagerImpl(SplitClientConfig config,
ImpressionsSender impressionsSender,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter impressionCounter,
ImpressionListener impressionListener) {
Expand All @@ -81,6 +91,7 @@ private ImpressionsManagerImpl(SplitClientConfig config,
_impressionsStorageConsumer = checkNotNull(impressionsStorageConsumer);
_impressionsStorageProducer = checkNotNull(impressionsStorageProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_processImpressionNone = checkNotNull(processImpressionNone);
_processImpressionStrategy = checkNotNull(processImpressionStrategy);
_impressionsSender = impressionsSender;
_counter = impressionCounter;
Expand All @@ -101,6 +112,8 @@ public void start(){
break;
case DEBUG:
_scheduler.scheduleAtFixedRate(this::sendImpressions, BULK_INITIAL_DELAY_SECONDS, _impressionsRefreshRate, TimeUnit.SECONDS);
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
TimeUnit.SECONDS);
break;
case NONE:
_scheduler.scheduleAtFixedRate(this::sendImpressionCounters, COUNT_INITIAL_DELAY_SECONDS, COUNT_REFRESH_RATE_SECONDS,
Expand All @@ -110,15 +123,28 @@ public void start(){
}

@Override
public void track(List<Impression> impressions) {
if (null == impressions) {
public void track(List<DecoratedImpression> decoratedImpressions) {
if (null == decoratedImpressions) {
return;
}

ImpressionsResult impressionsResult = _processImpressionStrategy.process(impressions);
List<Impression> impressionsForLogs = impressionsResult.getImpressionsToQueue();
List<Impression> impressionsToListener = impressionsResult.getImpressionsToListener();

List<Impression> impressionsForLogs = new ArrayList<>();
List<Impression> impressionsToListener = new ArrayList<>();

for (int i = 0; i < decoratedImpressions.size(); i++) {
ImpressionsResult impressionsResult;
if (decoratedImpressions.get(i).track()) {
impressionsResult = _processImpressionStrategy.process(Stream.of(
decoratedImpressions.get(i).impression()).collect(Collectors.toList()));
} else {
impressionsResult = _processImpressionNone.process(Stream.of(
decoratedImpressions.get(i).impression()).collect(Collectors.toList()));
}
if (!Objects.isNull(impressionsResult.getImpressionsToQueue())) {
impressionsForLogs.addAll(impressionsResult.getImpressionsToQueue());
}
if (!Objects.isNull(impressionsResult.getImpressionsToListener()))
impressionsToListener.addAll(impressionsResult.getImpressionsToListener());
}
int totalImpressions = impressionsForLogs.size();
long queued = _impressionsStorageProducer.put(impressionsForLogs.stream().map(KeyImpression::fromImpression).collect(Collectors.toList()));
if (queued < totalImpressions) {
Expand Down
Loading
Loading