Skip to content

Commit

Permalink
Merge branch 'development' into include-default-treatment-in-split-view
Browse files Browse the repository at this point in the history
  • Loading branch information
nthorn552 authored Jul 26, 2023
2 parents 0bd7a5a + 62fe4c9 commit 98654fe
Show file tree
Hide file tree
Showing 20 changed files with 236 additions and 49 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
1.9.0 (July 18, 2023)
- Updated streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system.

1.8.3 (June 29, 2023)
- Updated some transitive dependencies for vulnerability fixes.
- Updated SDK_READY_TIMED_OUT event to be emitted immediately when a connection error occurs using pluggable storage (i.e., when the wrapper `connect` promise is rejected) in consumer and partial consumer modes.
Expand Down
16 changes: 8 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@splitsoftware/splitio-commons",
"version": "1.8.3",
"version": "1.9.0",
"description": "Split Javascript SDK common components",
"main": "cjs/index.js",
"module": "esm/index.js",
Expand Down
1 change: 1 addition & 0 deletions src/logger/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export const WARN_SPLITS_FILTER_INVALID = 220;
export const WARN_SPLITS_FILTER_EMPTY = 221;
export const WARN_SDK_KEY = 222;
export const STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2 = 223;
export const STREAMING_PARSING_SPLIT_UPDATE = 224;

export const ERROR_ENGINE_COMBINER_IFELSEIF = 300;
export const ERROR_LOGLEVEL_INVALID = 301;
Expand Down
1 change: 1 addition & 0 deletions src/logger/messages/warn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ export const codesWarn: [number, string][] = codesError.concat([
[c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS+': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'],

[c.STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching MySegments due to an error processing %s notification: %s'],
[c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'],
]);
22 changes: 21 additions & 1 deletion src/storages/inMemory/TelemetryCacheInMemory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ImpressionDataType, EventDataType, LastSync, HttpErrors, HttpLatencies, StreamingEvent, Method, OperationType, MethodExceptions, MethodLatencies, TelemetryUsageStatsPayload } from '../../sync/submitters/types';
import { ImpressionDataType, EventDataType, LastSync, HttpErrors, HttpLatencies, StreamingEvent, Method, OperationType, MethodExceptions, MethodLatencies, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../../sync/submitters/types';
import { DEDUPED, DROPPED, LOCALHOST_MODE, QUEUED } from '../../utils/constants';
import { findLatencyIndex } from '../findLatencyIndex';
import { ISegmentsCacheSync, ISplitsCacheSync, IStorageFactoryParams, ITelemetryCacheSync } from '../types';
Expand Down Expand Up @@ -56,6 +56,7 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync {
eD: this.getEventStats(DROPPED),
sE: this.popStreamingEvents(),
t: this.popTags(),
ufs: this.popUpdatesFromSSE(),
};
}

Expand Down Expand Up @@ -244,4 +245,23 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync {
this.e = false;
}

private updatesFromSSE = {
sp: 0,
ms: 0
};

popUpdatesFromSSE() {
const result = this.updatesFromSSE;
this.updatesFromSSE = {
sp: 0,
ms: 0,
};
return result;
}

recordUpdatesFromSSE(type: UpdatesFromSSEEnum) {
this.updatesFromSSE[type]++;
this.e = false;
}

}
19 changes: 18 additions & 1 deletion src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ describe('TELEMETRY CACHE', () => {
test('"isEmpty" and "pop" methods', () => {
const cache = new TelemetryCacheInMemory();
const expectedEmptyPayload = {
lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: undefined, seC: undefined, skC: undefined, eQ: 0, eD: 0, sE: [], t: []
lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: undefined, seC: undefined, skC: undefined, eQ: 0, eD: 0, sE: [], t: [], ufs:{ sp: 0, ms: 0 }
};

// Initially, the cache is empty
Expand All @@ -227,4 +227,21 @@ describe('TELEMETRY CACHE', () => {
expect(cache.isEmpty()).toBe(true);
});

test('updates from SSE', () => {
expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0});
cache.recordUpdatesFromSSE(SPLITS);
cache.recordUpdatesFromSSE(SPLITS);
cache.recordUpdatesFromSSE(SPLITS);
cache.recordUpdatesFromSSE(MY_SEGMENT);
cache.recordUpdatesFromSSE(MY_SEGMENT);
expect(cache.popUpdatesFromSSE()).toEqual({sp: 3, ms: 2});
expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0});
cache.recordUpdatesFromSSE(SPLITS);
cache.recordUpdatesFromSSE(MY_SEGMENT);
cache.recordUpdatesFromSSE(SPLITS);
cache.recordUpdatesFromSSE(MY_SEGMENT);
expect(cache.popUpdatesFromSSE()).toEqual({sp: 2, ms: 2});
expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0});
});

});
3 changes: 2 additions & 1 deletion src/storages/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MaybeThenable, ISplit } from '../dtos/types';
import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload } from '../sync/submitters/types';
import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../sync/submitters/types';
import { SplitIO, ImpressionDTO, ISettings } from '../types';

/**
Expand Down Expand Up @@ -409,6 +409,7 @@ export interface ITelemetryRuntimeProducerSync {
recordTokenRefreshes(): void;
recordStreamingEvents(streamingEvent: StreamingEvent): void;
recordSessionLength(ms: number): void;
recordUpdatesFromSSE(type: UpdatesFromSSEEnum): void
}

export interface ITelemetryEvaluationProducerSync {
Expand Down
3 changes: 2 additions & 1 deletion src/sync/polling/types.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { ISplit } from '../../dtos/types';
import { IReadinessManager } from '../../readiness/types';
import { IStorageSync } from '../../storages/types';
import { ITask, ISyncTask } from '../types';

export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number], boolean> { }
export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { }

export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { }

Expand Down
37 changes: 34 additions & 3 deletions src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { settingsSplitApi } from '../../../../utils/settingsValidation/__tests__
import { EventEmitter } from '../../../../utils/MinEvents';
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker';
import { splitNotifications } from '../../../streaming/__tests__/dataMocks';

const activeSplitWithSegments = {
name: 'Split1',
Expand Down Expand Up @@ -55,10 +56,11 @@ test('splitChangesUpdater / compute splits mutation', () => {
expect(splitsMutation.segments).toEqual(['A', 'B']);
});

test('splitChangesUpdater / factory', (done) => {
describe('splitChangesUpdater', () => {

fetchMock.once('*', { status: 200, body: splitChangesMock1 }); // @ts-ignore
const splitApi = splitApiFactory(settingsSplitApi, { getFetch: () => fetchMock, EventEmitter }, telemetryTrackerFactory());
const fetchSplitChanges = jest.spyOn(splitApi, 'fetchSplitChanges');
const splitChangesFetcher = splitChangesFetcherFactory(splitApi.fetchSplitChanges);

const splitsCache = new SplitsCacheInMemory();
Expand All @@ -73,7 +75,12 @@ test('splitChangesUpdater / factory', (done) => {

const splitChangesUpdater = splitChangesUpdaterFactory(loggerMock, splitChangesFetcher, splitsCache, segmentsCache, readinessManager.splits, 1000, 1);

splitChangesUpdater().then((result) => {
afterEach(() => {
jest.clearAllMocks();
});

test('test without payload', async () => {
const result = await splitChangesUpdater();
expect(setChangeNumber).toBeCalledTimes(1);
expect(setChangeNumber).lastCalledWith(splitChangesMock1.till);
expect(addSplits).toBeCalledTimes(1);
Expand All @@ -83,7 +90,31 @@ test('splitChangesUpdater / factory', (done) => {
expect(registerSegments).toBeCalledTimes(1);
expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived');
expect(result).toBe(true);
done();
});

test('test with payload', async () => {
const ARCHIVED_FF = 'ARCHIVED';
let index = 0;
for (const notification of splitNotifications) {
const payload = notification.decoded as ISplit;
const changeNumber = payload.changeNumber;

await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true);
// fetch not being called
expect(fetchSplitChanges).toBeCalledTimes(0);
// Change number being updated
expect(setChangeNumber).toBeCalledTimes(index + 1);
expect(setChangeNumber.mock.calls[index][0]).toEqual(changeNumber);
// Add feature flag in notification
expect(addSplits).toBeCalledTimes(index + 1);
expect(addSplits.mock.calls[index][0].length).toBe(payload.status === ARCHIVED_FF ? 0 : 1);
// Remove feature flag if status is ARCHIVED
expect(removeSplits).toBeCalledTimes(index + 1);
expect(removeSplits.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [payload.name] : []);
// fetch segments after feature flag update
expect(registerSegments).toBeCalledTimes(index + 1);
expect(registerSegments.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [] : ['maur-2']);
index++;
}
});
});
10 changes: 6 additions & 4 deletions src/sync/polling/updaters/splitChangesUpdater.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/
import { ILogger } from '../../../logger/types';
import { SYNC_SPLITS_FETCH, SYNC_SPLITS_NEW, SYNC_SPLITS_REMOVED, SYNC_SPLITS_SEGMENTS, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants';

type ISplitChangesUpdater = (noCache?: boolean, till?: number) => Promise<boolean>
type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) => Promise<boolean>

// Checks that all registered segments have been fetched (changeNumber !== -1 for every segment).
// Returns a promise that could be rejected.
Expand Down Expand Up @@ -111,16 +111,18 @@ export function splitChangesUpdaterFactory(
* @param {boolean | undefined} noCache true to revalidate data to fetch
* @param {boolean | undefined} till query param to bypass CDN requests
*/
return function splitChangesUpdater(noCache?: boolean, till?: number) {
return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) {

/**
* @param {number} since current changeNumber at splitsCache
* @param {number} retry current number of retry attempts
*/
function _splitChangesUpdater(since: number, retry = 0): Promise<boolean> {
log.debug(SYNC_SPLITS_FETCH, [since]);

const fetcherPromise = splitChangesFetcher(since, noCache, till, _promiseDecorator)
const fetcherPromise = Promise.resolve(splitUpdateNotification ?
{ splits: [splitUpdateNotification.payload], till: splitUpdateNotification.changeNumber } :
splitChangesFetcher(since, noCache, till, _promiseDecorator)
)
.then((splitChanges: ISplitChangesResponse) => {
startingUp = false;

Expand Down
8 changes: 6 additions & 2 deletions src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { IMySegmentsSyncTask, MySegmentsData } from '../../polling/types';
import { Backoff } from '../../../utils/Backoff';
import { IUpdateWorker } from './types';
import { MY_SEGMENT } from '../../../utils/constants';
import { ITelemetryTracker } from '../../../trackers/types';

/**
* MySegmentsUpdateWorker factory
*/
export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask): IUpdateWorker {
export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask, telemetryTracker: ITelemetryTracker): IUpdateWorker {

let maxChangeNumber = 0; // keeps the maximum changeNumber among queued events
let currentChangeNumber = -1;
Expand All @@ -23,8 +25,10 @@ export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask):
// fetch mySegments revalidating data if cached
mySegmentsSyncTask.execute(_segmentsData, true).then((result) => {
if (!isHandlingEvent) return; // halt if `stop` has been called
if (result !== false) // Unlike `Splits|SegmentsUpdateWorker`, we cannot use `mySegmentsCache.getChangeNumber` since `/mySegments` endpoint doesn't provide this value.
if (result !== false) {// Unlike `Splits|SegmentsUpdateWorker`, we cannot use `mySegmentsCache.getChangeNumber` since `/mySegments` endpoint doesn't provide this value.
if (_segmentsData) telemetryTracker.trackUpdatesFromSSE(MY_SEGMENT);
currentChangeNumber = Math.max(currentChangeNumber, currentMaxChangeNumber); // use `currentMaxChangeNumber`, in case that `maxChangeNumber` was updated during fetch.
}
if (handleNewEvent) {
__handleMySegmentsUpdateCall();
} else {
Expand Down
21 changes: 15 additions & 6 deletions src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { ISplit } from '../../../dtos/types';
import { ILogger } from '../../../logger/types';
import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants';
import { ISplitsEventEmitter } from '../../../readiness/types';
import { ISplitsCacheSync } from '../../../storages/types';
import { ITelemetryTracker } from '../../../trackers/types';
import { Backoff } from '../../../utils/Backoff';
import { SPLITS } from '../../../utils/constants';
import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types';
import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types';
import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants';
Expand All @@ -11,25 +14,27 @@ import { IUpdateWorker } from './types';
/**
* SplitsUpdateWorker factory
*/
export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } {
export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } {

let maxChangeNumber = 0;
let handleNewEvent = false;
let isHandlingEvent: boolean;
let cdnBypass: boolean;
let payload: ISplit | undefined;
const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT);

function __handleSplitUpdateCall() {
isHandlingEvent = true;
if (maxChangeNumber > splitsCache.getChangeNumber()) {
handleNewEvent = false;

const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined;
// fetch splits revalidating data if cached
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined).then(() => {
splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => {
if (!isHandlingEvent) return; // halt if `stop` has been called
if (handleNewEvent) {
__handleSplitUpdateCall();
} else {
if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS);
// fetch new registered segments for server-side API. Not retrying on error
if (segmentsSyncTask) segmentsSyncTask.execute(true);

Expand Down Expand Up @@ -66,22 +71,26 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync,
*
* @param {number} changeNumber change number of the SPLIT_UPDATE notification
*/
function put({ changeNumber }: Pick<ISplitUpdateData, 'changeNumber'>) {
function put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit) {
const currentChangeNumber = splitsCache.getChangeNumber();

if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return;

maxChangeNumber = changeNumber;
handleNewEvent = true;
cdnBypass = false;
payload = undefined;

if (_payload && currentChangeNumber === pcn) {
payload = _payload;
}

if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall();
backoff.reset();
}

return {
put,

/**
* Invoked by NotificationProcessor on SPLIT_KILL event
*
Expand All @@ -95,7 +104,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync,
splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true);
}
// queues the SplitChanges fetch (only if changeNumber is newer)
put({ changeNumber });
put({ changeNumber } as ISplitUpdateData);
},

stop() {
Expand Down
Loading

0 comments on commit 98654fe

Please sign in to comment.