From f3ec717bea00025b7901e18c5da0a253e89dbb9e Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Thu, 22 Jun 2023 10:55:52 -0300 Subject: [PATCH 01/15] [SDKS-7056] Implement notification processor --- src/sync/polling/types.ts | 2 +- .../polling/updaters/splitChangesUpdater.ts | 7 ++++-- .../UpdateWorkers/SplitsUpdateWorker.ts | 23 +++++++++++++++---- .../__tests__/SplitsUpdateWorker.spec.ts | 8 +++---- src/sync/streaming/pushManager.ts | 5 ++-- 5 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/sync/polling/types.ts b/src/sync/polling/types.ts index 29453097..12f482a0 100644 --- a/src/sync/polling/types.ts +++ b/src/sync/polling/types.ts @@ -2,7 +2,7 @@ import { IReadinessManager } from '../../readiness/types'; import { IStorageSync } from '../../storages/types'; import { ITask, ISyncTask } from '../types'; -export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number], boolean> { } +export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, payload?: any], boolean> { } export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { } diff --git a/src/sync/polling/updaters/splitChangesUpdater.ts b/src/sync/polling/updaters/splitChangesUpdater.ts index a2e63d0b..5e80c72a 100644 --- a/src/sync/polling/updaters/splitChangesUpdater.ts +++ b/src/sync/polling/updaters/splitChangesUpdater.ts @@ -111,7 +111,7 @@ export function splitChangesUpdaterFactory( * @param {boolean | undefined} noCache true to revalidate data to fetch * @param {boolean | undefined} till query param to bypass CDN requests */ - return function splitChangesUpdater(noCache?: boolean, till?: number) { + return function splitChangesUpdater(noCache?: boolean, till?: number, payload?: any) { /** * @param {number} since current changeNumber at splitsCache @@ -120,7 +120,10 @@ export function splitChangesUpdaterFactory( function _splitChangesUpdater(since: number, retry = 0): Promise { log.debug(SYNC_SPLITS_FETCH, [since]); - const fetcherPromise = splitChangesFetcher(since, noCache, till, _promiseDecorator) + const fetcherPromise = Promise.resolve(payload ? + { splits: [payload], till: payload.changeNumber } : + splitChangesFetcher(since, noCache, till, _promiseDecorator) + ) .then((splitChanges: ISplitChangesResponse) => { startingUp = false; diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 1a1264df..91aa9072 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -11,7 +11,7 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } { +export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void, putWithPayload(payload: any): void } { let maxChangeNumber = 0; let handleNewEvent = false; @@ -19,13 +19,13 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, let cdnBypass: boolean; const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT); - function __handleSplitUpdateCall() { + function __handleSplitUpdateCall(payload?: any) { isHandlingEvent = true; if (maxChangeNumber > splitsCache.getChangeNumber()) { handleNewEvent = false; // fetch splits revalidating data if cached - splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined).then(() => { + splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, payload).then(() => { if (!isHandlingEvent) return; // halt if `stop` has been called if (handleNewEvent) { __handleSplitUpdateCall(); @@ -79,9 +79,24 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, backoff.reset(); } + function putWithPayload(payload: any) { + const currentChangeNumber = splitsCache.getChangeNumber(); + const { changeNumber } = payload; + + if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; + + // @TODO check conditions regarding maxChangeNumber + if (payload && currentChangeNumber === payload.PreviousChangeNumber) { + __handleSplitUpdateCall(payload); + return; + } + + put({changeNumber}); + } + return { put, - + putWithPayload, /** * Invoked by NotificationProcessor on SPLIT_KILL event * diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 666fc25b..8d639a19 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -126,8 +126,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('Refresh completed bypassing the CDN in 2 attempts.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined]), - [true, 100], [true, 100], + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), + [true, 100, undefined], [true, 100, undefined], ]); // `execute` was called 12 times. Last 2 with CDN bypass // Handle new event after previous is completed @@ -151,8 +151,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('No changes fetched after 10 attempts with CDN bypassed.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined]), - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100, undefined]), ]); // `execute` was called 20 times. Last 10 with CDN bypass // Handle new event after previous ends (not completed) diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index d3e10b39..b0437b21 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -226,9 +226,8 @@ export function pushManagerFactory( try { const payload = parseFFUpdatePayload(parsedData.c, parsedData.d); if (payload) { - // @TODO replace splitsUpdateWorker.put method with instant ff processor and updater - // splitsUpdateWorker.putWithPayload(payload); - // return; + splitsUpdateWorker.putWithPayload(payload); + return; } } catch (e) { // @TODO define a error code for feature flags parsing From e294998d2911510eeafb011d00b10b6ffd1a7d74 Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Fri, 23 Jun 2023 16:16:06 -0300 Subject: [PATCH 02/15] Remove putWithPayloadMethod, reuse put --- .../polling/updaters/splitChangesUpdater.ts | 9 +++-- .../UpdateWorkers/SplitsUpdateWorker.ts | 36 ++++++++----------- src/sync/streaming/pushManager.ts | 2 +- 3 files changed, 19 insertions(+), 28 deletions(-) diff --git a/src/sync/polling/updaters/splitChangesUpdater.ts b/src/sync/polling/updaters/splitChangesUpdater.ts index 5e80c72a..9ec76a7e 100644 --- a/src/sync/polling/updaters/splitChangesUpdater.ts +++ b/src/sync/polling/updaters/splitChangesUpdater.ts @@ -8,7 +8,7 @@ import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/ import { ILogger } from '../../../logger/types'; import { SYNC_SPLITS_FETCH, SYNC_SPLITS_NEW, SYNC_SPLITS_REMOVED, SYNC_SPLITS_SEGMENTS, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants'; -type ISplitChangesUpdater = (noCache?: boolean, till?: number) => Promise +type ISplitChangesUpdater = (noCache?: boolean, till?: number, payload?: any) => Promise // Checks that all registered segments have been fetched (changeNumber !== -1 for every segment). // Returns a promise that could be rejected. @@ -111,7 +111,7 @@ export function splitChangesUpdaterFactory( * @param {boolean | undefined} noCache true to revalidate data to fetch * @param {boolean | undefined} till query param to bypass CDN requests */ - return function splitChangesUpdater(noCache?: boolean, till?: number, payload?: any) { + return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) { /** * @param {number} since current changeNumber at splitsCache @@ -119,9 +119,8 @@ export function splitChangesUpdaterFactory( */ function _splitChangesUpdater(since: number, retry = 0): Promise { log.debug(SYNC_SPLITS_FETCH, [since]); - - const fetcherPromise = Promise.resolve(payload ? - { splits: [payload], till: payload.changeNumber } : + const fetcherPromise = Promise.resolve(splitUpdateNotification ? + { splits: [splitUpdateNotification.payload], till: splitUpdateNotification.changeNumber } : splitChangesFetcher(since, noCache, till, _promiseDecorator) ) .then((splitChanges: ISplitChangesResponse) => { diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 91aa9072..45ac1eb3 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -1,3 +1,4 @@ +import { ISplit } from '../../../dtos/types'; import { ILogger } from '../../../logger/types'; import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants'; import { ISplitsEventEmitter } from '../../../readiness/types'; @@ -11,21 +12,22 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void, putWithPayload(payload: any): void } { +export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } { let maxChangeNumber = 0; let handleNewEvent = false; let isHandlingEvent: boolean; let cdnBypass: boolean; + let payload: ISplit | undefined; const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT); - function __handleSplitUpdateCall(payload?: any) { + function __handleSplitUpdateCall() { isHandlingEvent = true; if (maxChangeNumber > splitsCache.getChangeNumber()) { handleNewEvent = false; - + const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; // fetch splits revalidating data if cached - splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, payload).then(() => { + splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification ).then(() => { if (!isHandlingEvent) return; // halt if `stop` has been called if (handleNewEvent) { __handleSplitUpdateCall(); @@ -66,7 +68,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, * * @param {number} changeNumber change number of the SPLIT_UPDATE notification */ - function put({ changeNumber }: Pick) { + function put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit) { const currentChangeNumber = splitsCache.getChangeNumber(); if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; @@ -74,29 +76,18 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, maxChangeNumber = changeNumber; handleNewEvent = true; cdnBypass = false; + payload = undefined; - if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall(); - backoff.reset(); - } - - function putWithPayload(payload: any) { - const currentChangeNumber = splitsCache.getChangeNumber(); - const { changeNumber } = payload; - - if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; - - // @TODO check conditions regarding maxChangeNumber - if (payload && currentChangeNumber === payload.PreviousChangeNumber) { - __handleSplitUpdateCall(payload); - return; + if (_payload && currentChangeNumber === pcn) { + payload = _payload; } - put({changeNumber}); + if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall(); + backoff.reset(); } return { put, - putWithPayload, /** * Invoked by NotificationProcessor on SPLIT_KILL event * @@ -105,12 +96,13 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, * @param {string} defaultTreatment default treatment value */ killSplit({ changeNumber, splitName, defaultTreatment }: ISplitKillData) { + payload = undefined; if (splitsCache.killLocally(splitName, defaultTreatment, changeNumber)) { // trigger an SDK_UPDATE if Split was killed locally splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true); } // queues the SplitChanges fetch (only if changeNumber is newer) - put({ changeNumber }); + put({ changeNumber } as ISplitUpdateData); }, stop() { diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index b0437b21..451f9369 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -226,7 +226,7 @@ export function pushManagerFactory( try { const payload = parseFFUpdatePayload(parsedData.c, parsedData.d); if (payload) { - splitsUpdateWorker.putWithPayload(payload); + splitsUpdateWorker.put(parsedData, payload); return; } } catch (e) { From 2e587b79f1cf6e2774235794fac6e9a535f6796d Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Fri, 23 Jun 2023 17:01:07 -0300 Subject: [PATCH 03/15] Update types --- src/sync/polling/types.ts | 3 ++- src/sync/polling/updaters/splitChangesUpdater.ts | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/sync/polling/types.ts b/src/sync/polling/types.ts index 12f482a0..4653b568 100644 --- a/src/sync/polling/types.ts +++ b/src/sync/polling/types.ts @@ -1,8 +1,9 @@ +import { ISplit } from '../../dtos/types'; import { IReadinessManager } from '../../readiness/types'; import { IStorageSync } from '../../storages/types'; import { ITask, ISyncTask } from '../types'; -export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, payload?: any], boolean> { } +export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { } export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { } diff --git a/src/sync/polling/updaters/splitChangesUpdater.ts b/src/sync/polling/updaters/splitChangesUpdater.ts index 9ec76a7e..63ff08f0 100644 --- a/src/sync/polling/updaters/splitChangesUpdater.ts +++ b/src/sync/polling/updaters/splitChangesUpdater.ts @@ -8,7 +8,7 @@ import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/ import { ILogger } from '../../../logger/types'; import { SYNC_SPLITS_FETCH, SYNC_SPLITS_NEW, SYNC_SPLITS_REMOVED, SYNC_SPLITS_SEGMENTS, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants'; -type ISplitChangesUpdater = (noCache?: boolean, till?: number, payload?: any) => Promise +type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) => Promise // Checks that all registered segments have been fetched (changeNumber !== -1 for every segment). // Returns a promise that could be rejected. From 1ee240bef5060cd91f794fd45c808f271c0de82f Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Fri, 23 Jun 2023 17:05:27 -0300 Subject: [PATCH 04/15] Remove unnecesary payload from kill --- src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 45ac1eb3..90384415 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -96,7 +96,6 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, * @param {string} defaultTreatment default treatment value */ killSplit({ changeNumber, splitName, defaultTreatment }: ISplitKillData) { - payload = undefined; if (splitsCache.killLocally(splitName, defaultTreatment, changeNumber)) { // trigger an SDK_UPDATE if Split was killed locally splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true); From 072abb878bd10e8125e73bf32e198486a14f5dda Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Mon, 26 Jun 2023 11:50:14 -0300 Subject: [PATCH 05/15] [SDKS-7058] IFF processor tests --- .../__tests__/splitChangesUpdater.spec.ts | 62 +++++++++++++++++++ .../__tests__/SplitsUpdateWorker.spec.ts | 59 ++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts b/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts index aee82501..6a1a3227 100644 --- a/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts +++ b/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts @@ -11,6 +11,7 @@ import { settingsSplitApi } from '../../../../utils/settingsValidation/__tests__ import { EventEmitter } from '../../../../utils/MinEvents'; import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker'; +import { splitNotifications } from '../../../streaming/__tests__/dataMocks'; const activeSplitWithSegments = { name: 'Split1', @@ -87,3 +88,64 @@ test('splitChangesUpdater / factory', (done) => { }); }); + +test('splitChangesUpdater / IFF factory', async () => { + + const ARCHIVED_FF = 'ARCHIVED'; + + fetchMock.once('*', { status: 200, body: splitChangesMock1 }); // @ts-ignore + const splitApi = splitApiFactory(settingsSplitApi, { getFetch: () => fetchMock, EventEmitter }, telemetryTrackerFactory()); + const fetchSplitChanges = jest.spyOn(splitApi, 'fetchSplitChanges'); + const splitChangesFetcher = splitChangesFetcherFactory(splitApi.fetchSplitChanges); + + const splitsCache = new SplitsCacheInMemory(); + const setChangeNumber = jest.spyOn(splitsCache, 'setChangeNumber'); + const addSplits = jest.spyOn(splitsCache, 'addSplits'); + const removeSplits = jest.spyOn(splitsCache, 'removeSplits'); + + const segmentsCache = new SegmentsCacheInMemory(); + const registerSegments = jest.spyOn(segmentsCache, 'registerSegments'); + const readinessManager = readinessManagerFactory(EventEmitter); + const splitsEmitSpy = jest.spyOn(readinessManager.splits, 'emit'); + + const splitChangesUpdater = splitChangesUpdaterFactory(loggerMock, splitChangesFetcher, splitsCache, segmentsCache, readinessManager.splits, 1000, 1); + + let index = 0; + for (const notification of splitNotifications ) { + const payload = notification.decoded as ISplit; + const changeNumber = payload.changeNumber; + + await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true); + // fetch not being called + expect(fetchSplitChanges).toBeCalledTimes(0); + // Change number being updated + expect(setChangeNumber).toBeCalledTimes(index+1); + expect(setChangeNumber.mock.calls[index][0]).toEqual(changeNumber); + // Add feature flag in notification + expect(addSplits).toBeCalledTimes(index+1); + expect(addSplits.mock.calls[index][0].length).toBe(payload.status === ARCHIVED_FF ? 0 : 1); + // Remove feature flag if status is ARCHIVED + expect(removeSplits).toBeCalledTimes(index+1); + expect(removeSplits.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [payload.name] : []); + // fetch segments after feature flag update + expect(registerSegments).toBeCalledTimes(index+1); + expect(registerSegments.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [] : ['maur-2']); + // Emit event + expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived'); + index++; + } + // test behaviour without payload + splitChangesUpdater().then((result) => { + expect(fetchSplitChanges).toBeCalledTimes(1); + expect(setChangeNumber).toBeCalledTimes(index+1); + expect(setChangeNumber.mock.calls[index][0]).toEqual(splitChangesMock1.till); + expect(addSplits).toBeCalledTimes(index+1); + expect(addSplits.mock.calls[index][0].length).toBe(splitChangesMock1.splits.length); + expect(removeSplits).toBeCalledTimes(index+1); + expect(removeSplits).lastCalledWith([]); + expect(registerSegments).toBeCalledTimes(index+1); + expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived'); + expect(result).toBe(true); + }); + +}); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 8d639a19..868d9420 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -6,6 +6,7 @@ import { FETCH_BACKOFF_MAX_RETRIES } from '../constants'; import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; import { syncTaskFactory } from '../../../syncTask'; import { Backoff } from '../../../../utils/Backoff'; +import { splitNotifications } from '../../../streaming/__tests__/dataMocks'; function splitsSyncTaskMock(splitStorage: SplitsCacheInMemory, changeNumbers = []) { @@ -205,4 +206,62 @@ describe('SplitsUpdateWorker', () => { expect(splitsSyncTask.execute).toBeCalledTimes(1); }); + test('put, avoid fetching if payload sent', async () => { + + const cache = new SplitsCacheInMemory(); + splitNotifications.forEach(notification => { + const pcn = cache.getChangeNumber(); + const splitsSyncTask = splitsSyncTaskMock(cache); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const payload = notification.decoded; + const changeNumber = payload.changeNumber; + splitUpdateWorker.put( { changeNumber, pcn }, payload); // queued + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {changeNumber, payload}]); + }); + }); + + test('put, ccn and pcn validation for IFF', () => { + const cache = new SplitsCacheInMemory(); + + // ccn = 103 & pcn = 104: Something was missed -> fetch sdk ap + let ccn = 103; + let pcn = 104; + let changeNumber = 105; + cache.setChangeNumber(ccn); + const notification = splitNotifications[0]; + + let splitsSyncTask = splitsSyncTaskMock(cache); + let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); + splitsSyncTask.execute.mockClear(); + + // ccn = 110 & pcn = 0: Something was missed -> something wrong in `pushNotificationManager` -> fetch sdk api + ccn = 110; + pcn = 0; + changeNumber = 111; + cache.setChangeNumber(ccn); + + splitsSyncTask = splitsSyncTaskMock(cache); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); + splitsSyncTask.execute.mockClear(); + + // ccn = 120 & pcn = 120: In order consecutive notifications apply normaly + ccn = 120; + pcn = 120; + changeNumber = 121; + cache.setChangeNumber(ccn); + + splitsSyncTask = splitsSyncTaskMock(cache); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }]); + + }); }); From d9f35f98504edadb5b8a099547afdde776d1355d Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Tue, 27 Jun 2023 13:02:43 -0300 Subject: [PATCH 06/15] Improvements on tests --- .../__tests__/splitChangesUpdater.spec.ts | 95 +++++++------------ .../__tests__/SplitsUpdateWorker.spec.ts | 6 +- 2 files changed, 35 insertions(+), 66 deletions(-) diff --git a/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts b/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts index 6a1a3227..c38e569d 100644 --- a/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts +++ b/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts @@ -56,10 +56,11 @@ test('splitChangesUpdater / compute splits mutation', () => { expect(splitsMutation.segments).toEqual(['A', 'B']); }); -test('splitChangesUpdater / factory', (done) => { +describe('splitChangesUpdater', () => { fetchMock.once('*', { status: 200, body: splitChangesMock1 }); // @ts-ignore const splitApi = splitApiFactory(settingsSplitApi, { getFetch: () => fetchMock, EventEmitter }, telemetryTrackerFactory()); + const fetchSplitChanges = jest.spyOn(splitApi, 'fetchSplitChanges'); const splitChangesFetcher = splitChangesFetcherFactory(splitApi.fetchSplitChanges); const splitsCache = new SplitsCacheInMemory(); @@ -74,7 +75,12 @@ test('splitChangesUpdater / factory', (done) => { const splitChangesUpdater = splitChangesUpdaterFactory(loggerMock, splitChangesFetcher, splitsCache, segmentsCache, readinessManager.splits, 1000, 1); - splitChangesUpdater().then((result) => { + afterEach(() => { + jest.clearAllMocks(); + }); + + test('test without payload', async () => { + const result = await splitChangesUpdater(); expect(setChangeNumber).toBeCalledTimes(1); expect(setChangeNumber).lastCalledWith(splitChangesMock1.till); expect(addSplits).toBeCalledTimes(1); @@ -84,68 +90,31 @@ test('splitChangesUpdater / factory', (done) => { expect(registerSegments).toBeCalledTimes(1); expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived'); expect(result).toBe(true); - done(); }); -}); - -test('splitChangesUpdater / IFF factory', async () => { - - const ARCHIVED_FF = 'ARCHIVED'; - - fetchMock.once('*', { status: 200, body: splitChangesMock1 }); // @ts-ignore - const splitApi = splitApiFactory(settingsSplitApi, { getFetch: () => fetchMock, EventEmitter }, telemetryTrackerFactory()); - const fetchSplitChanges = jest.spyOn(splitApi, 'fetchSplitChanges'); - const splitChangesFetcher = splitChangesFetcherFactory(splitApi.fetchSplitChanges); - - const splitsCache = new SplitsCacheInMemory(); - const setChangeNumber = jest.spyOn(splitsCache, 'setChangeNumber'); - const addSplits = jest.spyOn(splitsCache, 'addSplits'); - const removeSplits = jest.spyOn(splitsCache, 'removeSplits'); - - const segmentsCache = new SegmentsCacheInMemory(); - const registerSegments = jest.spyOn(segmentsCache, 'registerSegments'); - const readinessManager = readinessManagerFactory(EventEmitter); - const splitsEmitSpy = jest.spyOn(readinessManager.splits, 'emit'); - - const splitChangesUpdater = splitChangesUpdaterFactory(loggerMock, splitChangesFetcher, splitsCache, segmentsCache, readinessManager.splits, 1000, 1); - - let index = 0; - for (const notification of splitNotifications ) { - const payload = notification.decoded as ISplit; - const changeNumber = payload.changeNumber; - - await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true); - // fetch not being called - expect(fetchSplitChanges).toBeCalledTimes(0); - // Change number being updated - expect(setChangeNumber).toBeCalledTimes(index+1); - expect(setChangeNumber.mock.calls[index][0]).toEqual(changeNumber); - // Add feature flag in notification - expect(addSplits).toBeCalledTimes(index+1); - expect(addSplits.mock.calls[index][0].length).toBe(payload.status === ARCHIVED_FF ? 0 : 1); - // Remove feature flag if status is ARCHIVED - expect(removeSplits).toBeCalledTimes(index+1); - expect(removeSplits.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [payload.name] : []); - // fetch segments after feature flag update - expect(registerSegments).toBeCalledTimes(index+1); - expect(registerSegments.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [] : ['maur-2']); - // Emit event - expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived'); - index++; - } - // test behaviour without payload - splitChangesUpdater().then((result) => { - expect(fetchSplitChanges).toBeCalledTimes(1); - expect(setChangeNumber).toBeCalledTimes(index+1); - expect(setChangeNumber.mock.calls[index][0]).toEqual(splitChangesMock1.till); - expect(addSplits).toBeCalledTimes(index+1); - expect(addSplits.mock.calls[index][0].length).toBe(splitChangesMock1.splits.length); - expect(removeSplits).toBeCalledTimes(index+1); - expect(removeSplits).lastCalledWith([]); - expect(registerSegments).toBeCalledTimes(index+1); - expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived'); - expect(result).toBe(true); + test('test with payload', async () => { + const ARCHIVED_FF = 'ARCHIVED'; + let index = 0; + for (const notification of splitNotifications) { + const payload = notification.decoded as ISplit; + const changeNumber = payload.changeNumber; + + await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true); + // fetch not being called + expect(fetchSplitChanges).toBeCalledTimes(0); + // Change number being updated + expect(setChangeNumber).toBeCalledTimes(index + 1); + expect(setChangeNumber.mock.calls[index][0]).toEqual(changeNumber); + // Add feature flag in notification + expect(addSplits).toBeCalledTimes(index + 1); + expect(addSplits.mock.calls[index][0].length).toBe(payload.status === ARCHIVED_FF ? 0 : 1); + // Remove feature flag if status is ARCHIVED + expect(removeSplits).toBeCalledTimes(index + 1); + expect(removeSplits.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [payload.name] : []); + // fetch segments after feature flag update + expect(registerSegments).toBeCalledTimes(index + 1); + expect(registerSegments.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [] : ['maur-2']); + index++; + } }); - }); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 868d9420..262fd14d 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -224,7 +224,7 @@ describe('SplitsUpdateWorker', () => { test('put, ccn and pcn validation for IFF', () => { const cache = new SplitsCacheInMemory(); - // ccn = 103 & pcn = 104: Something was missed -> fetch sdk ap + // ccn = 103 & pcn = 104: Something was missed -> fetch split changes let ccn = 103; let pcn = 104; let changeNumber = 105; @@ -238,7 +238,7 @@ describe('SplitsUpdateWorker', () => { expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); splitsSyncTask.execute.mockClear(); - // ccn = 110 & pcn = 0: Something was missed -> something wrong in `pushNotificationManager` -> fetch sdk api + // ccn = 110 & pcn = 0: Something was missed -> something wrong in `pushNotificationManager` -> fetch split changes ccn = 110; pcn = 0; changeNumber = 111; @@ -251,7 +251,7 @@ describe('SplitsUpdateWorker', () => { expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); splitsSyncTask.execute.mockClear(); - // ccn = 120 & pcn = 120: In order consecutive notifications apply normaly + // ccn = 120 & pcn = 120: In order consecutive notifications arrived, apply updates normaly ccn = 120; pcn = 120; changeNumber = 121; From 676209b5ce870bee1e83bf5f6745c3d05e6f1e84 Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Thu, 29 Jun 2023 18:23:51 -0300 Subject: [PATCH 07/15] [SDKS-7060] Add Telemetry --- .../inMemory/TelemetryCacheInMemory.ts | 26 ++++++++++++++++++- .../__tests__/TelemetryCacheInMemory.spec.ts | 2 +- src/storages/types.ts | 3 ++- src/sync/polling/types.ts | 5 ++-- .../polling/updaters/splitChangesUpdater.ts | 11 +++++--- .../UpdateWorkers/SplitsUpdateWorker.ts | 5 ++-- .../__tests__/SplitsUpdateWorker.spec.ts | 16 ++++++------ src/sync/streaming/pushManager.ts | 2 +- .../__tests__/telemetrySubmitter.spec.ts | 2 +- src/sync/submitters/types.ts | 8 ++++++ src/trackers/telemetryTracker.ts | 7 ++++- src/trackers/types.ts | 6 ++++- 12 files changed, 71 insertions(+), 22 deletions(-) diff --git a/src/storages/inMemory/TelemetryCacheInMemory.ts b/src/storages/inMemory/TelemetryCacheInMemory.ts index e62660e0..390e80b4 100644 --- a/src/storages/inMemory/TelemetryCacheInMemory.ts +++ b/src/storages/inMemory/TelemetryCacheInMemory.ts @@ -1,4 +1,4 @@ -import { ImpressionDataType, EventDataType, LastSync, HttpErrors, HttpLatencies, StreamingEvent, Method, OperationType, MethodExceptions, MethodLatencies, TelemetryUsageStatsPayload } from '../../sync/submitters/types'; +import { ImpressionDataType, EventDataType, LastSync, HttpErrors, HttpLatencies, StreamingEvent, Method, OperationType, MethodExceptions, MethodLatencies, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../../sync/submitters/types'; import { DEDUPED, DROPPED, LOCALHOST_MODE, QUEUED } from '../../utils/constants'; import { findLatencyIndex } from '../findLatencyIndex'; import { ISegmentsCacheSync, ISplitsCacheSync, IStorageFactoryParams, ITelemetryCacheSync } from '../types'; @@ -56,6 +56,7 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync { eD: this.getEventStats(DROPPED), sE: this.popStreamingEvents(), t: this.popTags(), + ufs: this.popUpdatesFromSSE(), }; } @@ -244,4 +245,27 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync { this.e = false; } + private updatesFromSSE = { + Splits: 0, + MySegments: 0 + }; + + popUpdatesFromSSE() { + const { Splits, MySegments } = this.updatesFromSSE; + const result = { + sp: Splits, + ms: MySegments + }; + this.updatesFromSSE = { + Splits: 0, + MySegments: 0, + }; + return result; + } + + recordUpdatesFromSSE(type: UpdatesFromSSEEnum, amount: number) { + this.updatesFromSSE[type] += amount; + this.e = false; + } + } diff --git a/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts b/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts index 85cc43fe..3b7f944f 100644 --- a/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts +++ b/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts @@ -211,7 +211,7 @@ describe('TELEMETRY CACHE', () => { test('"isEmpty" and "pop" methods', () => { const cache = new TelemetryCacheInMemory(); const expectedEmptyPayload = { - lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: undefined, seC: undefined, skC: undefined, eQ: 0, eD: 0, sE: [], t: [] + lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: undefined, seC: undefined, skC: undefined, eQ: 0, eD: 0, sE: [], t: [], ufs:{ sp: 0, ms: 0 } }; // Initially, the cache is empty diff --git a/src/storages/types.ts b/src/storages/types.ts index 85995da2..abea59dd 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -1,5 +1,5 @@ import { MaybeThenable, ISplit } from '../dtos/types'; -import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload } from '../sync/submitters/types'; +import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../sync/submitters/types'; import { SplitIO, ImpressionDTO, ISettings } from '../types'; /** @@ -409,6 +409,7 @@ export interface ITelemetryRuntimeProducerSync { recordTokenRefreshes(): void; recordStreamingEvents(streamingEvent: StreamingEvent): void; recordSessionLength(ms: number): void; + recordUpdatesFromSSE(type: UpdatesFromSSEEnum, amount: number): void } export interface ITelemetryEvaluationProducerSync { diff --git a/src/sync/polling/types.ts b/src/sync/polling/types.ts index 4653b568..549cd02c 100644 --- a/src/sync/polling/types.ts +++ b/src/sync/polling/types.ts @@ -1,11 +1,12 @@ import { ISplit } from '../../dtos/types'; import { IReadinessManager } from '../../readiness/types'; import { IStorageSync } from '../../storages/types'; +import { ITelemetryTracker } from '../../trackers/types'; import { ITask, ISyncTask } from '../types'; -export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { } +export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }, telemetryTracker?: ITelemetryTracker], boolean> { } -export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { } +export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number, telemetryTracker?: ITelemetryTracker], boolean> { } export type MySegmentsData = string[] | { /* segment name */ diff --git a/src/sync/polling/updaters/splitChangesUpdater.ts b/src/sync/polling/updaters/splitChangesUpdater.ts index 63ff08f0..ca77b508 100644 --- a/src/sync/polling/updaters/splitChangesUpdater.ts +++ b/src/sync/polling/updaters/splitChangesUpdater.ts @@ -7,8 +7,9 @@ import { timeout } from '../../../utils/promise/timeout'; import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/constants'; import { ILogger } from '../../../logger/types'; import { SYNC_SPLITS_FETCH, SYNC_SPLITS_NEW, SYNC_SPLITS_REMOVED, SYNC_SPLITS_SEGMENTS, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants'; +import { ITelemetryTracker } from '../../../trackers/types'; -type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) => Promise +type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }, telemetryTracker?: ITelemetryTracker) => Promise // Checks that all registered segments have been fetched (changeNumber !== -1 for every segment). // Returns a promise that could be rejected. @@ -111,7 +112,7 @@ export function splitChangesUpdaterFactory( * @param {boolean | undefined} noCache true to revalidate data to fetch * @param {boolean | undefined} till query param to bypass CDN requests */ - return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) { + return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }, telemetryTracker?: ITelemetryTracker) { /** * @param {number} since current changeNumber at splitsCache @@ -139,8 +140,12 @@ export function splitChangesUpdaterFactory( splits.setChangeNumber(splitChanges.till), splits.addSplits(mutation.added), splits.removeSplits(mutation.removed), - segments.registerSegments(mutation.segments) + segments.registerSegments(mutation.segments), ]).then(() => { + if (telemetryTracker) { + telemetryTracker.trackUpdatesFromSSE('Splits', 1); + telemetryTracker.trackUpdatesFromSSE('MySegments', mutation.segments.length); + } if (splitsEventEmitter) { // To emit SDK_SPLITS_ARRIVED for server-side SDK, we must check that all registered segments have been fetched diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 90384415..d6a64fd9 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -3,6 +3,7 @@ import { ILogger } from '../../../logger/types'; import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants'; import { ISplitsEventEmitter } from '../../../readiness/types'; import { ISplitsCacheSync } from '../../../storages/types'; +import { ITelemetryTracker } from '../../../trackers/types'; import { Backoff } from '../../../utils/Backoff'; import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types'; import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types'; @@ -12,7 +13,7 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } { +export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask, telemetryTracker?: ITelemetryTracker): IUpdateWorker & { killSplit(event: ISplitKillData): void } { let maxChangeNumber = 0; let handleNewEvent = false; @@ -27,7 +28,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, handleNewEvent = false; const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; // fetch splits revalidating data if cached - splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification ).then(() => { + splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification, telemetryTracker).then(() => { if (!isHandlingEvent) return; // halt if `stop` has been called if (handleNewEvent) { __handleSplitUpdateCall(); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 262fd14d..aa236df6 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -127,8 +127,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('Refresh completed bypassing the CDN in 2 attempts.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), - [true, 100, undefined], [true, 100, undefined], + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined, undefined]), + [true, 100, undefined, undefined], [true, 100, undefined, undefined], ]); // `execute` was called 12 times. Last 2 with CDN bypass // Handle new event after previous is completed @@ -152,8 +152,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('No changes fetched after 10 attempts with CDN bypassed.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100, undefined]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined, undefined]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100, undefined, undefined]), ]); // `execute` was called 20 times. Last 10 with CDN bypass // Handle new event after previous ends (not completed) @@ -217,7 +217,7 @@ describe('SplitsUpdateWorker', () => { const changeNumber = payload.changeNumber; splitUpdateWorker.put( { changeNumber, pcn }, payload); // queued expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {changeNumber, payload}]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {changeNumber, payload}, undefined]); }); }); @@ -235,7 +235,7 @@ describe('SplitsUpdateWorker', () => { let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined, undefined]); splitsSyncTask.execute.mockClear(); // ccn = 110 & pcn = 0: Something was missed -> something wrong in `pushNotificationManager` -> fetch split changes @@ -248,7 +248,7 @@ describe('SplitsUpdateWorker', () => { splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined, undefined]); splitsSyncTask.execute.mockClear(); // ccn = 120 & pcn = 120: In order consecutive notifications arrived, apply updates normaly @@ -261,7 +261,7 @@ describe('SplitsUpdateWorker', () => { splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }, undefined]); }); }); diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index 451f9369..0ee93b6a 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -58,7 +58,7 @@ export function pushManagerFactory( // MySegmentsUpdateWorker (client-side) are initiated in `add` method const segmentsUpdateWorker = userKey ? undefined : SegmentsUpdateWorker(log, pollingManager.segmentsSyncTask as ISegmentsSyncTask, storage.segments); // For server-side we pass the segmentsSyncTask, used by SplitsUpdateWorker to fetch new segments - const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask); + const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask, telemetryTracker); // [Only for client-side] map of hashes to user keys, to dispatch MY_SEGMENTS_UPDATE events to the corresponding MySegmentsUpdateWorker const userKeyHashes: Record = {}; diff --git a/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts b/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts index 25ff16be..be7b6542 100644 --- a/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts +++ b/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts @@ -48,7 +48,7 @@ describe('Telemetry submitter', () => { expect(isEmptySpy).toBeCalledTimes(1); expect(popSpy).toBeCalledTimes(1); expect(postMetricsUsage).toBeCalledWith(JSON.stringify({ - lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: 0, seC: 0, skC: 0, eQ: 0, eD: 0, sE: [], t: ['tag1'] + lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: 0, seC: 0, skC: 0, eQ: 0, eD: 0, sE: [], t: ['tag1'], ufs:{ sp: 0, ms: 0 } })); // Await second periodic execution diff --git a/src/sync/submitters/types.ts b/src/sync/submitters/types.ts index 6eff14a0..9e7ebbd0 100644 --- a/src/sync/submitters/types.ts +++ b/src/sync/submitters/types.ts @@ -103,6 +103,7 @@ export type DROPPED = 1; export type DEDUPED = 2; export type ImpressionDataType = QUEUED | DROPPED | DEDUPED export type EventDataType = QUEUED | DROPPED; +export type UpdatesFromSSEEnum = 'Splits' | 'MySegments'; export type SPLITS = 'sp'; export type IMPRESSIONS = 'im'; @@ -151,6 +152,12 @@ export type TelemetryUsageStats = { mE?: MethodExceptions, // methodExceptions } +// amount of instant updates that we are doing by avoiding fetching to Split servers +export type UpdatesFromSSE = { + sp: number, // splits + ms?: number, // my segments +} + // 'metrics/usage' JSON request body export type TelemetryUsageStatsPayload = TelemetryUsageStats & { lS: LastSync, // lastSynchronization @@ -169,6 +176,7 @@ export type TelemetryUsageStatsPayload = TelemetryUsageStats & { eD: number, // eventsDropped sE: Array, // streamingEvents t?: Array, // tags + ufs?: UpdatesFromSSE, //UpdatesFromSSE } /** diff --git a/src/trackers/telemetryTracker.ts b/src/trackers/telemetryTracker.ts index 531381a3..79e6aadf 100644 --- a/src/trackers/telemetryTracker.ts +++ b/src/trackers/telemetryTracker.ts @@ -3,6 +3,7 @@ import { EXCEPTION, SDK_NOT_READY } from '../utils/labels'; import { ITelemetryTracker } from './types'; import { timer } from '../utils/timeTracker/timer'; import { TOKEN_REFRESH, AUTH_REJECTION } from '../utils/constants'; +import { UpdatesFromSSEEnum } from '../sync/submitters/types'; export function telemetryTrackerFactory( telemetryCache?: ITelemetryCacheSync | ITelemetryCacheAsync, @@ -52,6 +53,9 @@ export function telemetryTrackerFactory( addTag(tag: string) { // @ts-ignore if (telemetryCache.addTag) telemetryCache.addTag(tag); + }, + trackUpdatesFromSSE(type: UpdatesFromSSEEnum, amount: number) { + (telemetryCache as ITelemetryCacheSync).recordUpdatesFromSSE(type, amount); } }; @@ -62,7 +66,8 @@ export function telemetryTrackerFactory( trackHttp: noopTrack, sessionLength() { }, streamingEvent() { }, - addTag() { } + addTag() { }, + trackUpdatesFromSSE() { }, }; } } diff --git a/src/trackers/types.ts b/src/trackers/types.ts index 26648934..8931e151 100644 --- a/src/trackers/types.ts +++ b/src/trackers/types.ts @@ -1,5 +1,5 @@ import { SplitIO, ImpressionDTO } from '../types'; -import { StreamingEventType, Method, OperationType } from '../sync/submitters/types'; +import { StreamingEventType, Method, OperationType, UpdatesFromSSEEnum } from '../sync/submitters/types'; import { IEventsCacheBase } from '../storages/types'; import { NetworkError } from '../services/types'; @@ -45,6 +45,10 @@ export interface ITelemetryTracker { * Records tag */ addTag(tag: string): void + /** + * Records updates from sse + */ + trackUpdatesFromSSE(type: UpdatesFromSSEEnum, amount?: number): void; } export interface IFilterAdapter { From ccc1a09c4280658e638dcda31b1c7d99b4d0b1ec Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Fri, 30 Jun 2023 23:07:22 -0300 Subject: [PATCH 08/15] Do updates from sse telemetry in splits and mysegments update workers --- .../inMemory/TelemetryCacheInMemory.ts | 18 ++++----- .../__tests__/TelemetryCacheInMemory.spec.ts | 17 +++++++++ src/storages/types.ts | 2 +- src/sync/polling/types.ts | 5 +-- .../polling/updaters/splitChangesUpdater.ts | 11 ++---- .../UpdateWorkers/MySegmentsUpdateWorker.ts | 8 +++- .../UpdateWorkers/SplitsUpdateWorker.ts | 6 ++- .../__tests__/MySegmentsUpdateWorker.spec.ts | 7 ++-- .../__tests__/SplitsUpdateWorker.spec.ts | 37 ++++++++++--------- src/sync/streaming/pushManager.ts | 4 +- src/sync/submitters/types.ts | 2 +- src/trackers/telemetryTracker.ts | 4 +- src/trackers/types.ts | 2 +- 13 files changed, 69 insertions(+), 54 deletions(-) diff --git a/src/storages/inMemory/TelemetryCacheInMemory.ts b/src/storages/inMemory/TelemetryCacheInMemory.ts index 390e80b4..c4ae5131 100644 --- a/src/storages/inMemory/TelemetryCacheInMemory.ts +++ b/src/storages/inMemory/TelemetryCacheInMemory.ts @@ -246,25 +246,21 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync { } private updatesFromSSE = { - Splits: 0, - MySegments: 0 + sp: 0, + ms: 0 }; popUpdatesFromSSE() { - const { Splits, MySegments } = this.updatesFromSSE; - const result = { - sp: Splits, - ms: MySegments - }; + const result = this.updatesFromSSE; this.updatesFromSSE = { - Splits: 0, - MySegments: 0, + sp: 0, + ms: 0, }; return result; } - recordUpdatesFromSSE(type: UpdatesFromSSEEnum, amount: number) { - this.updatesFromSSE[type] += amount; + recordUpdatesFromSSE(type: UpdatesFromSSEEnum) { + this.updatesFromSSE[type]++; this.e = false; } diff --git a/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts b/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts index 3b7f944f..8bed17b7 100644 --- a/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts +++ b/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts @@ -227,4 +227,21 @@ describe('TELEMETRY CACHE', () => { expect(cache.isEmpty()).toBe(true); }); + test('updates from SSE', () => { + expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0}); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(MY_SEGMENT); + cache.recordUpdatesFromSSE(MY_SEGMENT); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 3, ms: 2}); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0}); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(MY_SEGMENT); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(MY_SEGMENT); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 2, ms: 2}); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0}); + }); + }); diff --git a/src/storages/types.ts b/src/storages/types.ts index abea59dd..dc2dfbc2 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -409,7 +409,7 @@ export interface ITelemetryRuntimeProducerSync { recordTokenRefreshes(): void; recordStreamingEvents(streamingEvent: StreamingEvent): void; recordSessionLength(ms: number): void; - recordUpdatesFromSSE(type: UpdatesFromSSEEnum, amount: number): void + recordUpdatesFromSSE(type: UpdatesFromSSEEnum): void } export interface ITelemetryEvaluationProducerSync { diff --git a/src/sync/polling/types.ts b/src/sync/polling/types.ts index 549cd02c..4653b568 100644 --- a/src/sync/polling/types.ts +++ b/src/sync/polling/types.ts @@ -1,12 +1,11 @@ import { ISplit } from '../../dtos/types'; import { IReadinessManager } from '../../readiness/types'; import { IStorageSync } from '../../storages/types'; -import { ITelemetryTracker } from '../../trackers/types'; import { ITask, ISyncTask } from '../types'; -export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }, telemetryTracker?: ITelemetryTracker], boolean> { } +export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { } -export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number, telemetryTracker?: ITelemetryTracker], boolean> { } +export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { } export type MySegmentsData = string[] | { /* segment name */ diff --git a/src/sync/polling/updaters/splitChangesUpdater.ts b/src/sync/polling/updaters/splitChangesUpdater.ts index ca77b508..63ff08f0 100644 --- a/src/sync/polling/updaters/splitChangesUpdater.ts +++ b/src/sync/polling/updaters/splitChangesUpdater.ts @@ -7,9 +7,8 @@ import { timeout } from '../../../utils/promise/timeout'; import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/constants'; import { ILogger } from '../../../logger/types'; import { SYNC_SPLITS_FETCH, SYNC_SPLITS_NEW, SYNC_SPLITS_REMOVED, SYNC_SPLITS_SEGMENTS, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants'; -import { ITelemetryTracker } from '../../../trackers/types'; -type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }, telemetryTracker?: ITelemetryTracker) => Promise +type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) => Promise // Checks that all registered segments have been fetched (changeNumber !== -1 for every segment). // Returns a promise that could be rejected. @@ -112,7 +111,7 @@ export function splitChangesUpdaterFactory( * @param {boolean | undefined} noCache true to revalidate data to fetch * @param {boolean | undefined} till query param to bypass CDN requests */ - return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }, telemetryTracker?: ITelemetryTracker) { + return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) { /** * @param {number} since current changeNumber at splitsCache @@ -140,12 +139,8 @@ export function splitChangesUpdaterFactory( splits.setChangeNumber(splitChanges.till), splits.addSplits(mutation.added), splits.removeSplits(mutation.removed), - segments.registerSegments(mutation.segments), + segments.registerSegments(mutation.segments) ]).then(() => { - if (telemetryTracker) { - telemetryTracker.trackUpdatesFromSSE('Splits', 1); - telemetryTracker.trackUpdatesFromSSE('MySegments', mutation.segments.length); - } if (splitsEventEmitter) { // To emit SDK_SPLITS_ARRIVED for server-side SDK, we must check that all registered segments have been fetched diff --git a/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts index 290cfa0c..eb1a25b1 100644 --- a/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts @@ -1,11 +1,13 @@ import { IMySegmentsSyncTask, MySegmentsData } from '../../polling/types'; import { Backoff } from '../../../utils/Backoff'; import { IUpdateWorker } from './types'; +import { MY_SEGMENT } from '../../../utils/constants'; +import { ITelemetryTracker } from '../../../trackers/types'; /** * MySegmentsUpdateWorker factory */ -export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask): IUpdateWorker { +export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask, telemetryTracker: ITelemetryTracker): IUpdateWorker { let maxChangeNumber = 0; // keeps the maximum changeNumber among queued events let currentChangeNumber = -1; @@ -23,8 +25,10 @@ export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask): // fetch mySegments revalidating data if cached mySegmentsSyncTask.execute(_segmentsData, true).then((result) => { if (!isHandlingEvent) return; // halt if `stop` has been called - if (result !== false) // Unlike `Splits|SegmentsUpdateWorker`, we cannot use `mySegmentsCache.getChangeNumber` since `/mySegments` endpoint doesn't provide this value. + if (result !== false) {// Unlike `Splits|SegmentsUpdateWorker`, we cannot use `mySegmentsCache.getChangeNumber` since `/mySegments` endpoint doesn't provide this value. + if (_segmentsData) telemetryTracker.trackUpdatesFromSSE(MY_SEGMENT); currentChangeNumber = Math.max(currentChangeNumber, currentMaxChangeNumber); // use `currentMaxChangeNumber`, in case that `maxChangeNumber` was updated during fetch. + } if (handleNewEvent) { __handleMySegmentsUpdateCall(); } else { diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index d6a64fd9..e9336aba 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -5,6 +5,7 @@ import { ISplitsEventEmitter } from '../../../readiness/types'; import { ISplitsCacheSync } from '../../../storages/types'; import { ITelemetryTracker } from '../../../trackers/types'; import { Backoff } from '../../../utils/Backoff'; +import { SPLITS } from '../../../utils/constants'; import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types'; import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types'; import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants'; @@ -13,7 +14,7 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask, telemetryTracker?: ITelemetryTracker): IUpdateWorker & { killSplit(event: ISplitKillData): void } { +export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } { let maxChangeNumber = 0; let handleNewEvent = false; @@ -28,11 +29,12 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, handleNewEvent = false; const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; // fetch splits revalidating data if cached - splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification, telemetryTracker).then(() => { + splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => { if (!isHandlingEvent) return; // halt if `stop` has been called if (handleNewEvent) { __handleSplitUpdateCall(); } else { + if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS); // fetch new registered segments for server-side API. Not retrying on error if (segmentsSyncTask) segmentsSyncTask.execute(true); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts index 94ec9b78..4babf4e2 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts @@ -2,6 +2,7 @@ import { MySegmentsUpdateWorker } from '../MySegmentsUpdateWorker'; import { syncTaskFactory } from '../../../syncTask'; import { Backoff } from '../../../../utils/Backoff'; +import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker'; function mySegmentsSyncTaskMock(values = []) { @@ -41,7 +42,7 @@ describe('MySegmentsUpdateWorker', () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock(); Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTrackerFactory()); // assert calling `mySegmentsSyncTask.execute` if `isExecuting` is false expect(mySegmentsSyncTask.isExecuting()).toBe(false); @@ -109,7 +110,7 @@ describe('MySegmentsUpdateWorker', () => { // setup Backoff.__TEST__BASE_MILLIS = 50; const mySegmentsSyncTask = mySegmentsSyncTaskMock([false, false, false]); // fetch fail - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTrackerFactory()); // while fetch fails, should retry with backoff mySegmentUpdateWorker.put(100); @@ -125,7 +126,7 @@ describe('MySegmentsUpdateWorker', () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock([false]); Backoff.__TEST__BASE_MILLIS = 1; - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTrackerFactory()); mySegmentUpdateWorker.put(100); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index aa236df6..0a0f054d 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -7,6 +7,7 @@ import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; import { syncTaskFactory } from '../../../syncTask'; import { Backoff } from '../../../../utils/Backoff'; import { splitNotifications } from '../../../streaming/__tests__/dataMocks'; +import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker'; function splitsSyncTaskMock(splitStorage: SplitsCacheInMemory, changeNumbers = []) { @@ -61,7 +62,7 @@ describe('SplitsUpdateWorker', () => { const splitsSyncTask = splitsSyncTaskMock(cache); Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); // assert calling `splitsSyncTask.execute` if `isExecuting` is false expect(splitsSyncTask.isExecuting()).toBe(false); @@ -100,7 +101,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__BASE_MILLIS = 50; const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); // while fetch fails, should retry with backoff splitUpdateWorker.put({ changeNumber: 100 }); @@ -119,7 +120,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -127,8 +128,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('Refresh completed bypassing the CDN in 2 attempts.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined, undefined]), - [true, 100, undefined, undefined], [true, 100, undefined, undefined], + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), + [true, 100, undefined], [true, 100, undefined], ]); // `execute` was called 12 times. Last 2 with CDN bypass // Handle new event after previous is completed @@ -144,7 +145,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -152,8 +153,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('No changes fetched after 10 attempts with CDN bypassed.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined, undefined]), - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100, undefined, undefined]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100, undefined]), ]); // `execute` was called 20 times. Last 10 with CDN bypass // Handle new event after previous ends (not completed) @@ -169,7 +170,7 @@ describe('SplitsUpdateWorker', () => { cache.addSplit('lol2', '{ "name": "something else"}'); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock, telemetryTrackerFactory()); // assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new splitUpdateWorker.killSplit({ changeNumber: 100, splitName: 'lol1', defaultTreatment: 'off' }); // splitsCache.killLocally is synchronous @@ -196,7 +197,7 @@ describe('SplitsUpdateWorker', () => { const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [95]); Backoff.__TEST__BASE_MILLIS = 1; - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); splitUpdateWorker.put({ changeNumber: 100 }); @@ -212,12 +213,12 @@ describe('SplitsUpdateWorker', () => { splitNotifications.forEach(notification => { const pcn = cache.getChangeNumber(); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); const payload = notification.decoded; const changeNumber = payload.changeNumber; splitUpdateWorker.put( { changeNumber, pcn }, payload); // queued expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {changeNumber, payload}, undefined]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {changeNumber, payload}]); }); }); @@ -232,10 +233,10 @@ describe('SplitsUpdateWorker', () => { const notification = splitNotifications[0]; let splitsSyncTask = splitsSyncTaskMock(cache); - let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined, undefined]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); splitsSyncTask.execute.mockClear(); // ccn = 110 & pcn = 0: Something was missed -> something wrong in `pushNotificationManager` -> fetch split changes @@ -245,10 +246,10 @@ describe('SplitsUpdateWorker', () => { cache.setChangeNumber(ccn); splitsSyncTask = splitsSyncTaskMock(cache); - splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined, undefined]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); splitsSyncTask.execute.mockClear(); // ccn = 120 & pcn = 120: In order consecutive notifications arrived, apply updates normaly @@ -258,10 +259,10 @@ describe('SplitsUpdateWorker', () => { cache.setChangeNumber(ccn); splitsSyncTask = splitsSyncTaskMock(cache); - splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); - expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }, undefined]); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }]); }); }); diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index 0ee93b6a..18b4eabd 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -58,7 +58,7 @@ export function pushManagerFactory( // MySegmentsUpdateWorker (client-side) are initiated in `add` method const segmentsUpdateWorker = userKey ? undefined : SegmentsUpdateWorker(log, pollingManager.segmentsSyncTask as ISegmentsSyncTask, storage.segments); // For server-side we pass the segmentsSyncTask, used by SplitsUpdateWorker to fetch new segments - const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask, telemetryTracker); + const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, telemetryTracker, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask); // [Only for client-side] map of hashes to user keys, to dispatch MY_SEGMENTS_UPDATE events to the corresponding MySegmentsUpdateWorker const userKeyHashes: Record = {}; @@ -344,7 +344,7 @@ export function pushManagerFactory( if (!userKeyHashes[hash]) { userKeyHashes[hash] = userKey; - clients[userKey] = { hash64: hash64(userKey), worker: MySegmentsUpdateWorker(mySegmentsSyncTask) }; + clients[userKey] = { hash64: hash64(userKey), worker: MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker) }; connectForNewClient = true; // we must reconnect on start, to listen the channel for the new user key // Reconnects in case of a new client. diff --git a/src/sync/submitters/types.ts b/src/sync/submitters/types.ts index 9e7ebbd0..95653fec 100644 --- a/src/sync/submitters/types.ts +++ b/src/sync/submitters/types.ts @@ -103,7 +103,7 @@ export type DROPPED = 1; export type DEDUPED = 2; export type ImpressionDataType = QUEUED | DROPPED | DEDUPED export type EventDataType = QUEUED | DROPPED; -export type UpdatesFromSSEEnum = 'Splits' | 'MySegments'; +export type UpdatesFromSSEEnum = SPLITS | MY_SEGMENT; export type SPLITS = 'sp'; export type IMPRESSIONS = 'im'; diff --git a/src/trackers/telemetryTracker.ts b/src/trackers/telemetryTracker.ts index 79e6aadf..0312cc94 100644 --- a/src/trackers/telemetryTracker.ts +++ b/src/trackers/telemetryTracker.ts @@ -54,8 +54,8 @@ export function telemetryTrackerFactory( // @ts-ignore if (telemetryCache.addTag) telemetryCache.addTag(tag); }, - trackUpdatesFromSSE(type: UpdatesFromSSEEnum, amount: number) { - (telemetryCache as ITelemetryCacheSync).recordUpdatesFromSSE(type, amount); + trackUpdatesFromSSE(type: UpdatesFromSSEEnum) { + (telemetryCache as ITelemetryCacheSync).recordUpdatesFromSSE(type); } }; diff --git a/src/trackers/types.ts b/src/trackers/types.ts index 8931e151..041c9b97 100644 --- a/src/trackers/types.ts +++ b/src/trackers/types.ts @@ -48,7 +48,7 @@ export interface ITelemetryTracker { /** * Records updates from sse */ - trackUpdatesFromSSE(type: UpdatesFromSSEEnum, amount?: number): void; + trackUpdatesFromSSE(type: UpdatesFromSSEEnum): void; } export interface IFilterAdapter { From c92b78b827179368a61a275cef3ba190b79a8a0e Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Tue, 4 Jul 2023 13:02:57 -0300 Subject: [PATCH 09/15] create a single instance of telemetryTracker to reuse in tests --- .../__tests__/MySegmentsUpdateWorker.spec.ts | 9 ++++-- .../__tests__/SplitsUpdateWorker.spec.ts | 28 ++++++++++++------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts index 4babf4e2..d8dd3074 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts @@ -41,8 +41,9 @@ describe('MySegmentsUpdateWorker', () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock(); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTrackerFactory()); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); // assert calling `mySegmentsSyncTask.execute` if `isExecuting` is false expect(mySegmentsSyncTask.isExecuting()).toBe(false); @@ -110,7 +111,8 @@ describe('MySegmentsUpdateWorker', () => { // setup Backoff.__TEST__BASE_MILLIS = 50; const mySegmentsSyncTask = mySegmentsSyncTaskMock([false, false, false]); // fetch fail - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff mySegmentUpdateWorker.put(100); @@ -125,8 +127,9 @@ describe('MySegmentsUpdateWorker', () => { test('stop', async () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock([false]); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTrackerFactory()); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); mySegmentUpdateWorker.put(100); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 0a0f054d..e871f3ee 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -60,9 +60,10 @@ describe('SplitsUpdateWorker', () => { // setup const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); // assert calling `splitsSyncTask.execute` if `isExecuting` is false expect(splitsSyncTask.isExecuting()).toBe(false); @@ -101,7 +102,8 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__BASE_MILLIS = 50; const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff splitUpdateWorker.put({ changeNumber: 100 }); @@ -120,7 +122,8 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -145,7 +148,8 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -170,7 +174,8 @@ describe('SplitsUpdateWorker', () => { cache.addSplit('lol2', '{ "name": "something else"}'); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock, telemetryTracker); // assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new splitUpdateWorker.killSplit({ changeNumber: 100, splitName: 'lol1', defaultTreatment: 'off' }); // splitsCache.killLocally is synchronous @@ -196,8 +201,9 @@ describe('SplitsUpdateWorker', () => { // setup const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [95]); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); @@ -213,7 +219,8 @@ describe('SplitsUpdateWorker', () => { splitNotifications.forEach(notification => { const pcn = cache.getChangeNumber(); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); const payload = notification.decoded; const changeNumber = payload.changeNumber; splitUpdateWorker.put( { changeNumber, pcn }, payload); // queued @@ -233,7 +240,8 @@ describe('SplitsUpdateWorker', () => { const notification = splitNotifications[0]; let splitsSyncTask = splitsSyncTaskMock(cache); - let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); @@ -246,7 +254,7 @@ describe('SplitsUpdateWorker', () => { cache.setChangeNumber(ccn); splitsSyncTask = splitsSyncTaskMock(cache); - splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); @@ -259,7 +267,7 @@ describe('SplitsUpdateWorker', () => { cache.setChangeNumber(ccn); splitsSyncTask = splitsSyncTaskMock(cache); - splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTrackerFactory()); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }]); From 2d00a6152e6db98a4eed8f80e1c5b9ea3fba4970 Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Tue, 4 Jul 2023 14:09:04 -0300 Subject: [PATCH 10/15] create an unique instance of telemetryTracker --- .../__tests__/MySegmentsUpdateWorker.spec.ts | 6 +++--- .../__tests__/SplitsUpdateWorker.spec.ts | 11 +++-------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts index d8dd3074..6726ed86 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts @@ -31,6 +31,9 @@ function mySegmentsSyncTaskMock(values = []) { }; } +const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + + describe('MySegmentsUpdateWorker', () => { afterEach(() => { @@ -41,7 +44,6 @@ describe('MySegmentsUpdateWorker', () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock(); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; // retry immediately const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); @@ -111,7 +113,6 @@ describe('MySegmentsUpdateWorker', () => { // setup Backoff.__TEST__BASE_MILLIS = 50; const mySegmentsSyncTask = mySegmentsSyncTaskMock([false, false, false]); // fetch fail - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff @@ -127,7 +128,6 @@ describe('MySegmentsUpdateWorker', () => { test('stop', async () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock([false]); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index e871f3ee..af5c6336 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -48,6 +48,9 @@ function assertKilledSplit(cache, changeNumber, splitName, defaultTreatment) { expect(split.changeNumber).toBe(changeNumber); // split must have the given change number } +const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + + describe('SplitsUpdateWorker', () => { afterEach(() => { // restore @@ -60,7 +63,6 @@ describe('SplitsUpdateWorker', () => { // setup const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; // retry immediately const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); @@ -102,7 +104,6 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__BASE_MILLIS = 50; const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff @@ -122,7 +123,6 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -148,7 +148,6 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -174,7 +173,6 @@ describe('SplitsUpdateWorker', () => { cache.addSplit('lol2', '{ "name": "something else"}'); const splitsSyncTask = splitsSyncTaskMock(cache); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock, telemetryTracker); // assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new @@ -201,7 +199,6 @@ describe('SplitsUpdateWorker', () => { // setup const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [95]); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker Backoff.__TEST__BASE_MILLIS = 1; const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); @@ -219,7 +216,6 @@ describe('SplitsUpdateWorker', () => { splitNotifications.forEach(notification => { const pcn = cache.getChangeNumber(); const splitsSyncTask = splitsSyncTaskMock(cache); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); const payload = notification.decoded; const changeNumber = payload.changeNumber; @@ -240,7 +236,6 @@ describe('SplitsUpdateWorker', () => { const notification = splitNotifications[0]; let splitsSyncTask = splitsSyncTaskMock(cache); - const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); expect(splitsSyncTask.execute).toBeCalledTimes(1); From 73a174f1877caf6a95c11ce4cc06abe2503c36c9 Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Wed, 5 Jul 2023 18:53:08 -0300 Subject: [PATCH 11/15] Prepare rc --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 96e25423..aa3391a1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.2", + "version": "1.8.3-rc.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@splitsoftware/splitio-commons", - "version": "1.8.2", + "version": "1.8.3-rc.0", "license": "Apache-2.0", "dependencies": { "tslib": "^2.3.1" diff --git a/package.json b/package.json index 6bc6e0e2..b354ea4c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.2", + "version": "1.8.3-rc.0", "description": "Split Javascript SDK common components", "main": "cjs/index.js", "module": "esm/index.js", From 7946a199d41ff127e46e0ef232b61d0499f5d84f Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Wed, 5 Jul 2023 19:20:30 -0300 Subject: [PATCH 12/15] RC 1.8.4-rc.0 --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 6a7cf3bf..8c375901 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.3", + "version": "1.8.4-rc.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@splitsoftware/splitio-commons", - "version": "1.8.3", + "version": "1.8.4-rc.0", "license": "Apache-2.0", "dependencies": { "tslib": "^2.3.1" diff --git a/package.json b/package.json index 49189f58..b5de58cc 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.3", + "version": "1.8.4-rc.0", "description": "Split Javascript SDK common components", "main": "cjs/index.js", "module": "esm/index.js", From 164dcedd3676d7fb6a623bb4d6f98c445868ebbc Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Wed, 12 Jul 2023 11:32:35 -0300 Subject: [PATCH 13/15] using warning code for errors parsing SPLIT_UPDATE --- src/logger/constants.ts | 1 + src/logger/messages/warn.ts | 1 + src/sync/streaming/pushManager.ts | 5 ++--- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/logger/constants.ts b/src/logger/constants.ts index 85c449a7..75f5fe1c 100644 --- a/src/logger/constants.ts +++ b/src/logger/constants.ts @@ -96,6 +96,7 @@ export const WARN_SPLITS_FILTER_INVALID = 220; export const WARN_SPLITS_FILTER_EMPTY = 221; export const WARN_SDK_KEY = 222; export const STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2 = 223; +export const STREAMING_PARSING_SPLIT_UPDATE = 224; export const ERROR_ENGINE_COMBINER_IFELSEIF = 300; export const ERROR_LOGLEVEL_INVALID = 301; diff --git a/src/logger/messages/warn.ts b/src/logger/messages/warn.ts index 8965ae4b..9917f862 100644 --- a/src/logger/messages/warn.ts +++ b/src/logger/messages/warn.ts @@ -32,4 +32,5 @@ export const codesWarn: [number, string][] = codesError.concat([ [c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS+': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'], [c.STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching MySegments due to an error processing %s notification: %s'], + [c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'], ]); diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index 18b4eabd..d6e6c7c8 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -12,7 +12,7 @@ import { forOwn } from '../../utils/lang'; import { SSEClient } from './SSEClient'; import { getMatching } from '../../utils/key'; import { MY_SEGMENTS_UPDATE, MY_SEGMENTS_UPDATE_V2, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants'; -import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2 } from '../../logger/constants'; +import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2, STREAMING_PARSING_SPLIT_UPDATE } from '../../logger/constants'; import { KeyList, UpdateStrategy } from './SSEHandler/types'; import { isInBitmap, parseBitmap, parseFFUpdatePayload, parseKeyList } from './parseUtils'; import { ISet, _Set } from '../../utils/lang/sets'; @@ -230,8 +230,7 @@ export function pushManagerFactory( return; } } catch (e) { - // @TODO define a error code for feature flags parsing - log.debug(e); + log.warn(STREAMING_PARSING_SPLIT_UPDATE, [e]); } } splitsUpdateWorker.put(parsedData); From 9a961c4d23cea03af5c6d2dc1dd8ae08a8c7ec87 Mon Sep 17 00:00:00 2001 From: Emmanuel Zamora Date: Tue, 18 Jul 2023 13:01:07 -0300 Subject: [PATCH 14/15] Prepare release 1.8.4 --- CHANGES.txt | 3 +++ package-lock.json | 4 ++-- package.json | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 11beb0a4..be25104b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +1.8.4 (July 18, 2023) + - Improved streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system. + 1.8.3 (June 29, 2023) - Updated some transitive dependencies for vulnerability fixes. - Updated SDK_READY_TIMED_OUT event to be emitted immediately when a connection error occurs using pluggable storage (i.e., when the wrapper `connect` promise is rejected) in consumer and partial consumer modes. diff --git a/package-lock.json b/package-lock.json index 8c375901..674ab0af 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.4-rc.0", + "version": "1.8.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@splitsoftware/splitio-commons", - "version": "1.8.4-rc.0", + "version": "1.8.4", "license": "Apache-2.0", "dependencies": { "tslib": "^2.3.1" diff --git a/package.json b/package.json index b5de58cc..af856231 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.4-rc.0", + "version": "1.8.4", "description": "Split Javascript SDK common components", "main": "cjs/index.js", "module": "esm/index.js", From 89901cfce3075de543eeccdae2ea3a99c2fd1707 Mon Sep 17 00:00:00 2001 From: Emiliano Sanchez Date: Tue, 18 Jul 2023 14:22:38 -0300 Subject: [PATCH 15/15] updated changelog entry --- CHANGES.txt | 2 +- package-lock.json | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index be25104b..e390eb1e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 1.8.4 (July 18, 2023) - - Improved streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system. + - Updated streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system. 1.8.3 (June 29, 2023) - Updated some transitive dependencies for vulnerability fixes. diff --git a/package-lock.json b/package-lock.json index 674ab0af..bb70c150 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7085,9 +7085,9 @@ } }, "node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -13405,9 +13405,9 @@ } }, "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true }, "shebang-command": {