Skip to content

Commit

Permalink
Merge pull request #274 from splitio/SDKS-7794-flagsets_in_redis_storage
Browse files Browse the repository at this point in the history
[SDKS-7794] Add flag sets support in Redis storage
  • Loading branch information
EmilianoSanchez authored Nov 29, 2023
2 parents 4c5658d + eed5459 commit 2476efc
Show file tree
Hide file tree
Showing 22 changed files with 202 additions and 94 deletions.
4 changes: 2 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
1.12.0 (December XX, 2023)
- Added support for Flag Sets in "consumer" and "partial consumer" modes for pluggable storage.
- Added support for Flag Sets in "consumer" and "partial consumer" modes for Pluggable and Redis storages.
- Updated evaluation flow to log a warning when using flag sets that don't contain cached feature flags.

1.11.0 (November 3, 2023)
Expand Down Expand Up @@ -57,7 +57,7 @@
- Added a new impressions mode for the SDK called NONE, to be used in factory when there is no desire to capture impressions on an SDK factory to feed Split's analytics engine. Running NONE mode, the SDK will only capture unique keys evaluated for a particular feature flag instead of full blown impressions.
- Updated SDK telemetry to support pluggable storage, partial consumer mode, and synchronizer.
- Updated storage implementations to improve the performance of feature flag evaluations (i.e., `getTreatment(s)` method calls) when using the default storage in memory.
- Updated evaluation flow to avoid unnecessarily storage calls when the SDK is not ready.
- Updated evaluation flow (i.e., `getTreatment(s)` method calls) to avoid calling the storage for cached feature flags when the SDK is not ready or ready from cache. It applies to all SDK modes.

1.6.1 (July 22, 2022)
- Updated GoogleAnalyticsToSplit integration to validate `autoRequire` config parameter and avoid some wrong warning logs when mapping GA hit fields to Split event properties.
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@splitsoftware/splitio-commons",
"version": "1.12.1-rc.0",
"version": "1.12.1-rc.1",
"description": "Split Javascript SDK common components",
"main": "cjs/index.js",
"module": "esm/index.js",
Expand Down
2 changes: 1 addition & 1 deletion src/readiness/__tests__/sdkReadinessManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe('SDK Readiness Manager - Event emitter', () => {
expect(sdkStatus[propName]).toBeTruthy(); // The sdkStatus exposes all minimal EventEmitter functionality.
});

expect(typeof sdkStatus['ready']).toBe('function'); // The sdkStatus exposes a .ready() function.
expect(typeof sdkStatus.ready).toBe('function'); // The sdkStatus exposes a .ready() function.

expect(typeof sdkStatus.Event).toBe('object'); // It also exposes the Event map,
expect(sdkStatus.Event.SDK_READY).toBe(SDK_READY); // which contains the constants for the events, for backwards compatibility.
Expand Down
8 changes: 4 additions & 4 deletions src/sdkClient/client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { evaluateFeature, evaluateFeatures, evaluateFeaturesByFlagSets } from '../evaluator';
import { thenable } from '../utils/promise/thenable';
import { getMatching, getBucketing } from '../utils/key';
import { validateSplitExistance } from '../utils/inputValidation/splitExistance';
import { validateTrafficTypeExistance } from '../utils/inputValidation/trafficTypeExistance';
import { validateSplitExistence } from '../utils/inputValidation/splitExistence';
import { validateTrafficTypeExistence } from '../utils/inputValidation/trafficTypeExistence';
import { SDK_NOT_READY } from '../utils/labels';
import { CONTROL, TREATMENT, TREATMENTS, TREATMENT_WITH_CONFIG, TREATMENTS_WITH_CONFIG, TRACK, TREATMENTS_WITH_CONFIG_BY_FLAGSETS, TREATMENTS_BY_FLAGSETS, TREATMENTS_BY_FLAGSET, TREATMENTS_WITH_CONFIG_BY_FLAGSET } from '../utils/constants';
import { IEvaluationResult } from '../evaluator/types';
Expand Down Expand Up @@ -133,7 +133,7 @@ export function clientFactory(params: ISdkFactoryContext): SplitIO.IClient | Spl
const { treatment, label, changeNumber, config = null } = evaluation;
log.info(IMPRESSION, [featureFlagName, matchingKey, treatment, label]);

if (validateSplitExistance(log, readinessManager, featureFlagName, label, invokingMethodName)) {
if (validateSplitExistence(log, readinessManager, featureFlagName, label, invokingMethodName)) {
log.info(IMPRESSION_QUEUEING);
queue.push({
feature: featureFlagName,
Expand Down Expand Up @@ -171,7 +171,7 @@ export function clientFactory(params: ISdkFactoryContext): SplitIO.IClient | Spl
};

// This may be async but we only warn, we don't actually care if it is valid or not in terms of queueing the event.
validateTrafficTypeExistance(log, readinessManager, storage.splits, mode, trafficTypeName, 'track');
validateTrafficTypeExistence(log, readinessManager, storage.splits, mode, trafficTypeName, 'track');

const result = eventTracker.track(eventData, size);

Expand Down
2 changes: 1 addition & 1 deletion src/sdkManager/__tests__/index.asyncCache.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ describe('MANAGER API', () => {

/** Teardown */
await cache.removeSplit(splitObject.name);
await connection.quit();
await connection.disconnect();
});

test('Async cache with error', async () => {
Expand Down
6 changes: 3 additions & 3 deletions src/sdkManager/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { objectAssign } from '../utils/lang/objectAssign';
import { thenable } from '../utils/promise/thenable';
import { find } from '../utils/lang';
import { validateSplit, validateSplitExistance, validateIfNotDestroyed, validateIfOperational } from '../utils/inputValidation';
import { validateSplit, validateSplitExistence, validateIfNotDestroyed, validateIfOperational } from '../utils/inputValidation';
import { ISplitsCacheAsync, ISplitsCacheSync } from '../storages/types';
import { ISdkReadinessManager } from '../readiness/types';
import { ISplit } from '../dtos/types';
Expand Down Expand Up @@ -71,12 +71,12 @@ export function sdkManagerFactory<TSplitCache extends ISplitsCacheSync | ISplits

if (thenable(split)) {
return split.catch(() => null).then(result => { // handle possible rejections when using pluggable storage
validateSplitExistance(log, readinessManager, splitName, result, SPLIT_FN_LABEL);
validateSplitExistence(log, readinessManager, splitName, result, SPLIT_FN_LABEL);
return objectToView(result);
});
}

validateSplitExistance(log, readinessManager, splitName, split, SPLIT_FN_LABEL);
validateSplitExistence(log, readinessManager, splitName, split, SPLIT_FN_LABEL);

return objectToView(split);
},
Expand Down
65 changes: 45 additions & 20 deletions src/storages/inRedis/SplitsCacheInRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { KeyBuilderSS } from '../KeyBuilderSS';
import { Redis } from 'ioredis';
import { ILogger } from '../../logger/types';
import { LOG_PREFIX } from './constants';
import { ISplit } from '../../dtos/types';
import { ISplit, ISplitFiltersValidation } from '../../dtos/types';
import { AbstractSplitsCacheAsync } from '../AbstractSplitsCacheAsync';
import { ISet } from '../../utils/lang/sets';
import { ISet, _Set, returnListDifference } from '../../utils/lang/sets';

/**
* Discard errors for an answer of multiple operations.
Expand All @@ -27,12 +27,14 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
private readonly redis: Redis;
private readonly keys: KeyBuilderSS;
private redisError?: string;
private readonly flagSetsFilter: string[];

constructor(log: ILogger, keys: KeyBuilderSS, redis: Redis) {
constructor(log: ILogger, keys: KeyBuilderSS, redis: Redis, splitFiltersValidation?: ISplitFiltersValidation) {
super();
this.log = log;
this.redis = redis;
this.keys = keys;
this.flagSetsFilter = splitFiltersValidation ? splitFiltersValidation.groupedFilters.bySet : [];

// There is no need to listen for redis 'error' event, because in that case ioredis calls will be rejected and handled by redis storage adapters.
// But it is done just to avoid getting the ioredis message `Unhandled error event`.
Expand All @@ -57,6 +59,24 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
return this.redis.incr(ttKey);
}

private _updateFlagSets(featureFlagName: string, flagSetsOfRemovedFlag?: string[], flagSetsOfAddedFlag?: string[]) {
const removeFromFlagSets = returnListDifference(flagSetsOfRemovedFlag, flagSetsOfAddedFlag);

let addToFlagSets = returnListDifference(flagSetsOfAddedFlag, flagSetsOfRemovedFlag);
if (this.flagSetsFilter.length > 0) {
addToFlagSets = addToFlagSets.filter(flagSet => {
return this.flagSetsFilter.some(filterFlagSet => filterFlagSet === flagSet);
});
}

const items = [featureFlagName];

return Promise.all([
...removeFromFlagSets.map(flagSetName => this.redis.srem(this.keys.buildFlagSetKey(flagSetName), items)),
...addToFlagSets.map(flagSetName => this.redis.sadd(this.keys.buildFlagSetKey(flagSetName), items))
]);
}

/**
* Add a given split.
* The returned promise is resolved when the operation success
Expand All @@ -66,24 +86,24 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
const splitKey = this.keys.buildSplitKey(name);
return this.redis.get(splitKey).then(splitFromStorage => {

// handling parsing errors
let parsedPreviousSplit: ISplit, newStringifiedSplit;
// handling parsing error
let parsedPreviousSplit: ISplit, stringifiedNewSplit;
try {
parsedPreviousSplit = splitFromStorage ? JSON.parse(splitFromStorage) : undefined;
newStringifiedSplit = JSON.stringify(split);
stringifiedNewSplit = JSON.stringify(split);
} catch (e) {
throw new Error('Error parsing feature flag definition: ' + e);
}

return this.redis.set(splitKey, newStringifiedSplit).then(() => {
return this.redis.set(splitKey, stringifiedNewSplit).then(() => {
// avoid unnecessary increment/decrement operations
if (parsedPreviousSplit && parsedPreviousSplit.trafficTypeName === split.trafficTypeName) return;

// update traffic type counts
return this._incrementCounts(split).then(() => {
if (parsedPreviousSplit) return this._decrementCounts(parsedPreviousSplit);
});
});
}).then(() => this._updateFlagSets(name, parsedPreviousSplit && parsedPreviousSplit.sets, split.sets));
}).then(() => true);
}

Expand All @@ -101,11 +121,12 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
* The returned promise is resolved when the operation success, with 1 or 0 indicating if the split existed or not.
* or rejected if it fails (e.g., redis operation fails).
*/
removeSplit(name: string): Promise<number> {
removeSplit(name: string) {
return this.getSplit(name).then((split) => {
if (split) {
this._decrementCounts(split);
return this._decrementCounts(split).then(() => this._updateFlagSets(name, split.sets));
}
}).then(() => {
return this.redis.del(this.keys.buildSplitKey(name));
});
}
Expand Down Expand Up @@ -174,7 +195,7 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
.then((listOfKeys) => this.redis.pipeline(listOfKeys.map(k => ['get', k])).exec())
.then(processPipelineAnswer)
.then((splitDefinitions) => splitDefinitions.map((splitDefinition) => {
return JSON.parse(splitDefinition as string);
return JSON.parse(splitDefinition);
}));
}

Expand All @@ -190,14 +211,18 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
}

/**
* Get list of split names related to a given flag set names list.
* The returned promise is resolved with the list of split names,
* or rejected if wrapper operation fails.
* @todo this is a no-op method to be implemented
* Get list of feature flag names related to a given list of flag set names.
* The returned promise is resolved with the list of feature flag names per flag set,
* or rejected if the pipelined redis operation fails.
*/
getNamesByFlagSets(): Promise<ISet<string>[]> {
this.log.error(LOG_PREFIX + 'ByFlagSet/s evaluations are not supported with Redis storage yet.');
return Promise.reject();
getNamesByFlagSets(flagSets: string[]): Promise<ISet<string>[]> {
return this.redis.pipeline(flagSets.map(flagSet => ['smembers', this.keys.buildFlagSetKey(flagSet)])).exec()
.then((results) => results.map(([e, value], index) => {
if (e === null) return value;

this.log.error(LOG_PREFIX + `Could not read result from get members of flag set ${flagSets[index]} due to an error: ${e}`);
}))
.then(namesByFlagSets => namesByFlagSets.map(namesByFlagSet => new _Set(namesByFlagSet)));
}

/**
Expand All @@ -214,14 +239,14 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {

ttCount = parseInt(ttCount as string, 10);
if (!isFiniteNumber(ttCount) || ttCount < 0) {
this.log.info(LOG_PREFIX + `Could not validate traffic type existance of ${trafficType} due to data corruption of some sorts.`);
this.log.info(LOG_PREFIX + `Could not validate traffic type existence of ${trafficType} due to data corruption of some sorts.`);
return false;
}

return ttCount > 0;
})
.catch(e => {
this.log.error(LOG_PREFIX + `Could not validate traffic type existance of ${trafficType} due to an error: ${e}.`);
this.log.error(LOG_PREFIX + `Could not validate traffic type existence of ${trafficType} due to an error: ${e}.`);
// If there is an error, bypass the validation so the event can get tracked.
return true;
});
Expand Down
2 changes: 1 addition & 1 deletion src/storages/inRedis/__tests__/EventsCacheInRedis.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ test('EVENTS CACHE IN REDIS / `track`, `count`, `popNWithMetadata` and `drop` me

// Clean up then end.
await connection.del(eventsKey, nonListKey);
await connection.quit();
await connection.disconnect();
});
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('IMPRESSION COUNTS CACHE IN REDIS', () => {
expect(counter._makeKey(null, new Date(2020, 9, 2, 10, 53, 12).getTime())).toBe(`null::${timestamp1}`);
expect(counter._makeKey(null, 0)).toBe('null::0');

await connection.quit();
await connection.disconnect();
});

test('Impression Counter Test BasicUsage', async () => {
Expand Down Expand Up @@ -76,10 +76,10 @@ describe('IMPRESSION COUNTS CACHE IN REDIS', () => {
expect(Object.keys(counter.pop()).length).toBe(0);

await connection.del(key);
await connection.quit();
await connection.disconnect();
});

test('POST IMPRESSION COUNTS IN REDIS FUNCTION', (done) => {
test('POST IMPRESSION COUNTS IN REDIS FUNCTION', async () => {
const connection = new Redis();
const counter = new ImpressionCountsCacheInRedis(loggerMock, key, connection);
// Clean up in case there are still keys there.
Expand All @@ -95,15 +95,12 @@ describe('IMPRESSION COUNTS CACHE IN REDIS', () => {
counter.track('feature2', nextHourTimestamp + 3, 2);
counter.track('feature2', nextHourTimestamp + 4, 2);

counter.postImpressionCountsInRedis().then(() => {
await counter.postImpressionCountsInRedis();

connection.hgetall(key).then(async data => {
expect(data).toStrictEqual(expected);
await connection.del(key);
await connection.quit();
done();
});
});
const data = await connection.hgetall(key);
expect(data).toStrictEqual(expected);
await connection.del(key);
await connection.disconnect();
});

test('start and stop task', (done) => {
Expand Down Expand Up @@ -183,6 +180,6 @@ describe('IMPRESSION COUNTS CACHE IN REDIS', () => {

expect(await connection.hgetall(key)).toStrictEqual({});
await connection.del(key);
await connection.quit();
await connection.disconnect();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ describe('IMPRESSIONS CACHE IN REDIS', () => {
expect(await c.count()).toBe(0); // storage should be empty after dropping it

await connection.del(impressionsKey);
await connection.quit();
await connection.disconnect();
});

test('`track` should not resolve before calling expire', async () => {
Expand Down Expand Up @@ -76,7 +76,7 @@ describe('IMPRESSIONS CACHE IN REDIS', () => {
// @ts-expect-error
await c.track([i1, i2]).then(() => {
connection.del(impressionsKey);
connection.quit(); // Try to disconnect right away.
connection.disconnect(); // Try to disconnect right away.
expect(spy1).toBeCalled(); // Redis rpush was called once before executing external callback.
// Following assertion fails if the expire takes place after disconnected and throws unhandledPromiseRejection
expect(spy2).toBeCalled(); // Redis expire was called once before executing external callback.
Expand Down
4 changes: 2 additions & 2 deletions src/storages/inRedis/__tests__/SegmentsCacheInRedis.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ describe('SEGMENTS CACHE IN REDIS', () => {
expect(await cache.isInSegment('mocked-segment', 'e')).toBe(true);

await cache.clear();
await connection.quit();
await connection.disconnect();
});

test('registerSegment / getRegisteredSegments', async () => {
Expand All @@ -55,7 +55,7 @@ describe('SEGMENTS CACHE IN REDIS', () => {
['s1', 's2', 's3', 's4'].forEach(s => expect(segments.indexOf(s) !== -1).toBe(true));

await cache.clear();
await connection.quit();
await connection.disconnect();
});

});
Loading

0 comments on commit 2476efc

Please sign in to comment.