Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impression toggle #523

Merged
merged 10 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void updateCache(Map<SplitAndKey, LocalhostSplit> map) {
String treatment = conditions.size() > 0 ? Treatments.CONTROL : localhostSplit.treatment;
configurations.put(localhostSplit.treatment, localhostSplit.config);

split = new ParsedSplit(splitName, 0, false, treatment,conditions, LOCALHOST, 0, 100, 0, 0, configurations, new HashSet<>());
split = new ParsedSplit(splitName, 0, false, treatment,conditions, LOCALHOST, 0, 100, 0, 0, configurations, new HashSet<>(), true);
parsedSplits.removeIf(parsedSplit -> parsedSplit.feature().equals(splitName));
parsedSplits.add(split);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public SplitChange fetch(long since, FetchOptions options) {
String.format("Could not retrieve splitChanges since %s; http return code %s", since, response.statusCode())
);
}

return Json.fromJson(response.body(), SplitChange.class);
} catch (Exception e) {
throw new IllegalStateException(String.format("Problem fetching splitChanges since %s: %s", since, e), e);
Expand Down
26 changes: 17 additions & 9 deletions client/src/main/java/io/split/client/SplitClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.split.client.api.Key;
import io.split.client.api.SplitResult;
import io.split.client.dtos.DecoratedImpression;
import io.split.client.dtos.Event;
import io.split.client.events.EventsStorageProducer;
import io.split.client.impressions.Impression;
Expand Down Expand Up @@ -356,7 +357,8 @@ private SplitResult getTreatmentWithConfigInternal(String matchingKey, String bu
String.format("sdk.%s", methodEnum.getMethod()),
_config.labelsEnabled() ? result.label : null,
result.changeNumber,
attributes
attributes,
result.track
);
_telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis() - initTime);
return new SplitResult(result.treatment, result.configurations);
Expand Down Expand Up @@ -435,7 +437,7 @@ private Map<String, SplitResult> getTreatmentsBySetsWithConfigInternal(String ma
private Map<String, SplitResult> processEvaluatorResult(Map<String, EvaluatorImp.TreatmentLabelAndChangeNumber> evaluatorResult,
MethodEnum methodEnum, String matchingKey, String bucketingKey, Map<String,
Object> attributes, long initTime){
List<Impression> impressions = new ArrayList<>();
List<DecoratedImpression> decoratedImpressions = new ArrayList<>();
Map<String, SplitResult> result = new HashMap<>();
evaluatorResult.keySet().forEach(t -> {
if (evaluatorResult.get(t).treatment.equals(Treatments.CONTROL) && evaluatorResult.get(t).label.
Expand All @@ -445,13 +447,16 @@ private Map<String, SplitResult> processEvaluatorResult(Map<String, EvaluatorImp
result.put(t, SPLIT_RESULT_CONTROL);
} else {
result.put(t, new SplitResult(evaluatorResult.get(t).treatment, evaluatorResult.get(t).configurations));
impressions.add(new Impression(matchingKey, bucketingKey, t, evaluatorResult.get(t).treatment, System.currentTimeMillis(),
evaluatorResult.get(t).label, evaluatorResult.get(t).changeNumber, attributes));
decoratedImpressions.add(
new DecoratedImpression(
new Impression(matchingKey, bucketingKey, t, evaluatorResult.get(t).treatment, System.currentTimeMillis(),
evaluatorResult.get(t).label, evaluatorResult.get(t).changeNumber, attributes),
evaluatorResult.get(t).track));
}
});
_telemetryEvaluationProducer.recordLatency(methodEnum, System.currentTimeMillis() - initTime);
if (impressions.size() > 0) {
_impressionManager.track(impressions);
if (decoratedImpressions.size() > 0) {
_impressionManager.track(decoratedImpressions);
}
return result;
}
Expand Down Expand Up @@ -501,10 +506,13 @@ private Set<String> filterSetsAreInConfig(Set<String> sets, MethodEnum methodEnu
return setsToReturn;
}
private void recordStats(String matchingKey, String bucketingKey, String featureFlagName, long start, String result,
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
String operation, String label, Long changeNumber, Map<String, Object> attributes, boolean track) {
try {
_impressionManager.track(Stream.of(new Impression(matchingKey, bucketingKey, featureFlagName, result, System.currentTimeMillis(),
label, changeNumber, attributes)).collect(Collectors.toList()));
_impressionManager.track(Stream.of(
new DecoratedImpression(
new Impression(matchingKey, bucketingKey, featureFlagName, result, System.currentTimeMillis(),
label, changeNumber, attributes),
track)).collect(Collectors.toList()));
} catch (Throwable t) {
_log.error("Exception", t);
}
Expand Down
24 changes: 11 additions & 13 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,11 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config,
ImpressionListener listener = !impressionListeners.isEmpty()
? new ImpressionListener.FederatedImpressionListener(impressionListeners)
: null;
counter = new ImpressionCounter();
nmayorsplit marked this conversation as resolved.
Show resolved Hide resolved
ProcessImpressionNone processImpressionNone = new ProcessImpressionNone(listener != null, _uniqueKeysTracker, counter);

switch (config.impressionsMode()) {
case OPTIMIZED:
counter = new ImpressionCounter();
ImpressionObserver impressionObserver = new ImpressionObserver(config.getLastSeenCacheSize());
processImpressionStrategy = new ProcessImpressionOptimized(listener != null, impressionObserver,
counter, _telemetryStorageProducer);
Expand All @@ -646,13 +648,12 @@ private ImpressionsManagerImpl buildImpressionsManager(SplitClientConfig config,
processImpressionStrategy = new ProcessImpressionDebug(listener != null, impressionObserver);
break;
case NONE:
counter = new ImpressionCounter();
processImpressionStrategy = new ProcessImpressionNone(listener != null, _uniqueKeysTracker, counter);
processImpressionStrategy = processImpressionNone;
break;
}
return ImpressionsManagerImpl.instance(config, _telemetryStorageProducer, impressionsStorageConsumer,
impressionsStorageProducer,
_impressionsSender, processImpressionStrategy, counter, listener);
_impressionsSender, processImpressionNone, processImpressionStrategy, counter, listener);
}

private SDKMetadata createSdkMetadata(boolean ipAddressEnabled, String splitSdkVersion) {
Expand Down Expand Up @@ -690,15 +691,12 @@ private void manageSdkReady(SplitClientConfig config) {
}

private UniqueKeysTracker createUniqueKeysTracker(SplitClientConfig config) {
if (config.impressionsMode().equals(ImpressionsManager.Mode.NONE)) {
int uniqueKeysRefreshRate = config.operationMode().equals(OperationMode.STANDALONE)
? config.uniqueKeysRefreshRateInMemory()
: config.uniqueKeysRefreshRateRedis();
return new UniqueKeysTrackerImp(_telemetrySynchronizer, uniqueKeysRefreshRate,
config.filterUniqueKeysRefreshRate(),
config.getThreadFactory());
}
return null;
int uniqueKeysRefreshRate = config.operationMode().equals(OperationMode.STANDALONE)
? config.uniqueKeysRefreshRateInMemory()
: config.uniqueKeysRefreshRateRedis();
return new UniqueKeysTrackerImp(_telemetrySynchronizer, uniqueKeysRefreshRate,
config.filterUniqueKeysRefreshRate(),
config.getThreadFactory());
}

private SplitChangeFetcher createSplitChangeFetcher(SplitClientConfig splitClientConfig) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.split.client.dtos;

import io.split.client.impressions.Impression;

public class DecoratedImpression {
public Impression impression;
public boolean track;

public DecoratedImpression(Impression impression, boolean track) {
this.impression = impression;
this.track = track;
}
}

1 change: 1 addition & 0 deletions client/src/main/java/io/split/client/dtos/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class Split {
public int algo;
public Map<String, String> configurations;
public HashSet<String> sets;
public Boolean trackImpression = null;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.split.client.impressions;

import io.split.client.dtos.DecoratedImpression;

import java.util.List;

public interface ImpressionsManager {
Expand All @@ -10,14 +12,14 @@ public enum Mode {
NONE
}

void track(List<Impression> impressions);
void track(List<DecoratedImpression> decoratedImpressions);
void start();
void close();

final class NoOpImpressionsManager implements ImpressionsManager {

@Override
public void track(List<Impression> impressions) { /* do nothing */ }
public void track(List<DecoratedImpression> decoratedImpressions) { /* do nothing */ }

@Override
public void start(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import com.google.common.annotations.VisibleForTesting;
import io.split.client.SplitClientConfig;
import io.split.client.dtos.DecoratedImpression;
import io.split.client.dtos.KeyImpression;
import io.split.client.dtos.TestImpressions;
import io.split.client.impressions.strategy.ProcessImpressionNone;
import io.split.client.impressions.strategy.ProcessImpressionStrategy;
import io.split.client.utils.SplitExecutorFactory;
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
Expand All @@ -13,10 +15,13 @@

import java.io.Closeable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -40,37 +45,42 @@ public class ImpressionsManagerImpl implements ImpressionsManager, Closeable {
private TelemetryRuntimeProducer _telemetryRuntimeProducer;
private ImpressionCounter _counter;
private ProcessImpressionStrategy _processImpressionStrategy;
private ProcessImpressionNone _processImpressionNone;

private final int _impressionsRefreshRate;

public static ImpressionsManagerImpl instance(SplitClientConfig config,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ImpressionsSender impressionsSender,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter counter,
ImpressionListener listener) throws URISyntaxException {
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
impressionsStorageProducer, processImpressionStrategy, counter, listener);
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
}

public static ImpressionsManagerImpl instanceForTest(SplitClientConfig config,
ImpressionsSender impressionsSender,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter counter,
ImpressionListener listener) {
return new ImpressionsManagerImpl(config, impressionsSender, telemetryRuntimeProducer, impressionsStorageConsumer,
impressionsStorageProducer, processImpressionStrategy, counter, listener);
impressionsStorageProducer, processImpressionNone, processImpressionStrategy, counter, listener);
}

private ImpressionsManagerImpl(SplitClientConfig config,
ImpressionsSender impressionsSender,
TelemetryRuntimeProducer telemetryRuntimeProducer,
ImpressionsStorageConsumer impressionsStorageConsumer,
ImpressionsStorageProducer impressionsStorageProducer,
ProcessImpressionNone processImpressionNone,
ProcessImpressionStrategy processImpressionStrategy,
ImpressionCounter impressionCounter,
ImpressionListener impressionListener) {
Expand All @@ -81,6 +91,7 @@ private ImpressionsManagerImpl(SplitClientConfig config,
_impressionsStorageConsumer = checkNotNull(impressionsStorageConsumer);
_impressionsStorageProducer = checkNotNull(impressionsStorageProducer);
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
_processImpressionNone = checkNotNull(processImpressionNone);
_processImpressionStrategy = checkNotNull(processImpressionStrategy);
_impressionsSender = impressionsSender;
_counter = impressionCounter;
Expand Down Expand Up @@ -110,15 +121,28 @@ public void start(){
}

@Override
public void track(List<Impression> impressions) {
if (null == impressions) {
public void track(List<DecoratedImpression> decoratedImpressions) {
if (null == decoratedImpressions) {
return;
}

ImpressionsResult impressionsResult = _processImpressionStrategy.process(impressions);
List<Impression> impressionsForLogs = impressionsResult.getImpressionsToQueue();
List<Impression> impressionsToListener = impressionsResult.getImpressionsToListener();

List<Impression> impressionsForLogs = new ArrayList<>();
List<Impression> impressionsToListener = new ArrayList<>();

for (int i = 0; i < decoratedImpressions.size(); i++) {
ImpressionsResult impressionsResult;
if (decoratedImpressions.get(i).track) {
impressionsResult = _processImpressionStrategy.process(Stream.of(
decoratedImpressions.get(i).impression).collect(Collectors.toList()));
} else {
impressionsResult = _processImpressionNone.process(Stream.of(
decoratedImpressions.get(i).impression).collect(Collectors.toList()));
}
if (!Objects.isNull(impressionsResult.getImpressionsToQueue())) {
impressionsForLogs.addAll(impressionsResult.getImpressionsToQueue());
}
if (!Objects.isNull(impressionsResult.getImpressionsToListener()))
impressionsToListener.addAll(impressionsResult.getImpressionsToListener());
}
int totalImpressions = impressionsForLogs.size();
long queued = _impressionsStorageProducer.put(impressionsForLogs.stream().map(KeyImpression::fromImpression).collect(Collectors.toList()));
if (queued < totalImpressions) {
Expand Down
Loading
Loading