diff --git a/CHANGES.txt b/CHANGES.txt index 11beb0a4..887f9da6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,6 @@ +1.9.0 (July 18, 2023) + - Updated streaming architecture implementation to apply feature flag updates from the notification received which is now enhanced, improving efficiency and reliability of the whole update system. + 1.8.3 (June 29, 2023) - Updated some transitive dependencies for vulnerability fixes. - Updated SDK_READY_TIMED_OUT event to be emitted immediately when a connection error occurs using pluggable storage (i.e., when the wrapper `connect` promise is rejected) in consumer and partial consumer modes. diff --git a/package-lock.json b/package-lock.json index 6a7cf3bf..8b4a82b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.3", + "version": "1.9.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@splitsoftware/splitio-commons", - "version": "1.8.3", + "version": "1.9.0", "license": "Apache-2.0", "dependencies": { "tslib": "^2.3.1" @@ -7085,9 +7085,9 @@ } }, "node_modules/semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true, "bin": { "semver": "bin/semver.js" @@ -13405,9 +13405,9 @@ } }, "semver": { - "version": "6.3.0", - "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.0.tgz", - "integrity": "sha512-b39TBaTSfV6yBrapU89p5fKekE2m/NwnDocOVruQFS1/veMgdzuPcnOM34M6CwxW8jH/lxEa5rBoDeUwu5HHTw==", + "version": "6.3.1", + "resolved": "https://registry.npmjs.org/semver/-/semver-6.3.1.tgz", + "integrity": "sha512-BR7VvDCVHO+q2xBEWskxS6DJE1qRnb7DxzUrogb71CWoSficBxYsiAGd+Kl0mmq/MprG9yArRkyrQxTO6XjMzA==", "dev": true }, "shebang-command": { diff --git a/package.json b/package.json index 49189f58..1e0e5d5a 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@splitsoftware/splitio-commons", - "version": "1.8.3", + "version": "1.9.0", "description": "Split Javascript SDK common components", "main": "cjs/index.js", "module": "esm/index.js", diff --git a/src/logger/constants.ts b/src/logger/constants.ts index 85c449a7..75f5fe1c 100644 --- a/src/logger/constants.ts +++ b/src/logger/constants.ts @@ -96,6 +96,7 @@ export const WARN_SPLITS_FILTER_INVALID = 220; export const WARN_SPLITS_FILTER_EMPTY = 221; export const WARN_SDK_KEY = 222; export const STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2 = 223; +export const STREAMING_PARSING_SPLIT_UPDATE = 224; export const ERROR_ENGINE_COMBINER_IFELSEIF = 300; export const ERROR_LOGLEVEL_INVALID = 301; diff --git a/src/logger/messages/warn.ts b/src/logger/messages/warn.ts index 8965ae4b..9917f862 100644 --- a/src/logger/messages/warn.ts +++ b/src/logger/messages/warn.ts @@ -32,4 +32,5 @@ export const codesWarn: [number, string][] = codesError.concat([ [c.WARN_SDK_KEY, c.LOG_PREFIX_SETTINGS+': You already have %s. We recommend keeping only one instance of the factory at all times (Singleton pattern) and reusing it throughout your application'], [c.STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching MySegments due to an error processing %s notification: %s'], + [c.STREAMING_PARSING_SPLIT_UPDATE, c.LOG_PREFIX_SYNC_STREAMING + 'Fetching SplitChanges due to an error processing SPLIT_UPDATE notification: %s'], ]); diff --git a/src/storages/inMemory/TelemetryCacheInMemory.ts b/src/storages/inMemory/TelemetryCacheInMemory.ts index e62660e0..c4ae5131 100644 --- a/src/storages/inMemory/TelemetryCacheInMemory.ts +++ b/src/storages/inMemory/TelemetryCacheInMemory.ts @@ -1,4 +1,4 @@ -import { ImpressionDataType, EventDataType, LastSync, HttpErrors, HttpLatencies, StreamingEvent, Method, OperationType, MethodExceptions, MethodLatencies, TelemetryUsageStatsPayload } from '../../sync/submitters/types'; +import { ImpressionDataType, EventDataType, LastSync, HttpErrors, HttpLatencies, StreamingEvent, Method, OperationType, MethodExceptions, MethodLatencies, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../../sync/submitters/types'; import { DEDUPED, DROPPED, LOCALHOST_MODE, QUEUED } from '../../utils/constants'; import { findLatencyIndex } from '../findLatencyIndex'; import { ISegmentsCacheSync, ISplitsCacheSync, IStorageFactoryParams, ITelemetryCacheSync } from '../types'; @@ -56,6 +56,7 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync { eD: this.getEventStats(DROPPED), sE: this.popStreamingEvents(), t: this.popTags(), + ufs: this.popUpdatesFromSSE(), }; } @@ -244,4 +245,23 @@ export class TelemetryCacheInMemory implements ITelemetryCacheSync { this.e = false; } + private updatesFromSSE = { + sp: 0, + ms: 0 + }; + + popUpdatesFromSSE() { + const result = this.updatesFromSSE; + this.updatesFromSSE = { + sp: 0, + ms: 0, + }; + return result; + } + + recordUpdatesFromSSE(type: UpdatesFromSSEEnum) { + this.updatesFromSSE[type]++; + this.e = false; + } + } diff --git a/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts b/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts index 85cc43fe..8bed17b7 100644 --- a/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts +++ b/src/storages/inMemory/__tests__/TelemetryCacheInMemory.spec.ts @@ -211,7 +211,7 @@ describe('TELEMETRY CACHE', () => { test('"isEmpty" and "pop" methods', () => { const cache = new TelemetryCacheInMemory(); const expectedEmptyPayload = { - lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: undefined, seC: undefined, skC: undefined, eQ: 0, eD: 0, sE: [], t: [] + lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: undefined, seC: undefined, skC: undefined, eQ: 0, eD: 0, sE: [], t: [], ufs:{ sp: 0, ms: 0 } }; // Initially, the cache is empty @@ -227,4 +227,21 @@ describe('TELEMETRY CACHE', () => { expect(cache.isEmpty()).toBe(true); }); + test('updates from SSE', () => { + expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0}); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(MY_SEGMENT); + cache.recordUpdatesFromSSE(MY_SEGMENT); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 3, ms: 2}); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0}); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(MY_SEGMENT); + cache.recordUpdatesFromSSE(SPLITS); + cache.recordUpdatesFromSSE(MY_SEGMENT); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 2, ms: 2}); + expect(cache.popUpdatesFromSSE()).toEqual({sp: 0, ms: 0}); + }); + }); diff --git a/src/storages/types.ts b/src/storages/types.ts index 85995da2..dc2dfbc2 100644 --- a/src/storages/types.ts +++ b/src/storages/types.ts @@ -1,5 +1,5 @@ import { MaybeThenable, ISplit } from '../dtos/types'; -import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload } from '../sync/submitters/types'; +import { EventDataType, HttpErrors, HttpLatencies, ImpressionDataType, LastSync, Method, MethodExceptions, MethodLatencies, MultiMethodExceptions, MultiMethodLatencies, MultiConfigs, OperationType, StoredEventWithMetadata, StoredImpressionWithMetadata, StreamingEvent, UniqueKeysPayloadCs, UniqueKeysPayloadSs, TelemetryUsageStatsPayload, UpdatesFromSSEEnum } from '../sync/submitters/types'; import { SplitIO, ImpressionDTO, ISettings } from '../types'; /** @@ -409,6 +409,7 @@ export interface ITelemetryRuntimeProducerSync { recordTokenRefreshes(): void; recordStreamingEvents(streamingEvent: StreamingEvent): void; recordSessionLength(ms: number): void; + recordUpdatesFromSSE(type: UpdatesFromSSEEnum): void } export interface ITelemetryEvaluationProducerSync { diff --git a/src/sync/polling/types.ts b/src/sync/polling/types.ts index 29453097..4653b568 100644 --- a/src/sync/polling/types.ts +++ b/src/sync/polling/types.ts @@ -1,8 +1,9 @@ +import { ISplit } from '../../dtos/types'; import { IReadinessManager } from '../../readiness/types'; import { IStorageSync } from '../../storages/types'; import { ITask, ISyncTask } from '../types'; -export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number], boolean> { } +export interface ISplitsSyncTask extends ISyncTask<[noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }], boolean> { } export interface ISegmentsSyncTask extends ISyncTask<[fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number], boolean> { } diff --git a/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts b/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts index aee82501..c38e569d 100644 --- a/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts +++ b/src/sync/polling/updaters/__tests__/splitChangesUpdater.spec.ts @@ -11,6 +11,7 @@ import { settingsSplitApi } from '../../../../utils/settingsValidation/__tests__ import { EventEmitter } from '../../../../utils/MinEvents'; import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker'; +import { splitNotifications } from '../../../streaming/__tests__/dataMocks'; const activeSplitWithSegments = { name: 'Split1', @@ -55,10 +56,11 @@ test('splitChangesUpdater / compute splits mutation', () => { expect(splitsMutation.segments).toEqual(['A', 'B']); }); -test('splitChangesUpdater / factory', (done) => { +describe('splitChangesUpdater', () => { fetchMock.once('*', { status: 200, body: splitChangesMock1 }); // @ts-ignore const splitApi = splitApiFactory(settingsSplitApi, { getFetch: () => fetchMock, EventEmitter }, telemetryTrackerFactory()); + const fetchSplitChanges = jest.spyOn(splitApi, 'fetchSplitChanges'); const splitChangesFetcher = splitChangesFetcherFactory(splitApi.fetchSplitChanges); const splitsCache = new SplitsCacheInMemory(); @@ -73,7 +75,12 @@ test('splitChangesUpdater / factory', (done) => { const splitChangesUpdater = splitChangesUpdaterFactory(loggerMock, splitChangesFetcher, splitsCache, segmentsCache, readinessManager.splits, 1000, 1); - splitChangesUpdater().then((result) => { + afterEach(() => { + jest.clearAllMocks(); + }); + + test('test without payload', async () => { + const result = await splitChangesUpdater(); expect(setChangeNumber).toBeCalledTimes(1); expect(setChangeNumber).lastCalledWith(splitChangesMock1.till); expect(addSplits).toBeCalledTimes(1); @@ -83,7 +90,31 @@ test('splitChangesUpdater / factory', (done) => { expect(registerSegments).toBeCalledTimes(1); expect(splitsEmitSpy).toBeCalledWith('state::splits-arrived'); expect(result).toBe(true); - done(); }); + test('test with payload', async () => { + const ARCHIVED_FF = 'ARCHIVED'; + let index = 0; + for (const notification of splitNotifications) { + const payload = notification.decoded as ISplit; + const changeNumber = payload.changeNumber; + + await expect(splitChangesUpdater(undefined, undefined, { payload, changeNumber: changeNumber })).resolves.toBe(true); + // fetch not being called + expect(fetchSplitChanges).toBeCalledTimes(0); + // Change number being updated + expect(setChangeNumber).toBeCalledTimes(index + 1); + expect(setChangeNumber.mock.calls[index][0]).toEqual(changeNumber); + // Add feature flag in notification + expect(addSplits).toBeCalledTimes(index + 1); + expect(addSplits.mock.calls[index][0].length).toBe(payload.status === ARCHIVED_FF ? 0 : 1); + // Remove feature flag if status is ARCHIVED + expect(removeSplits).toBeCalledTimes(index + 1); + expect(removeSplits.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [payload.name] : []); + // fetch segments after feature flag update + expect(registerSegments).toBeCalledTimes(index + 1); + expect(registerSegments.mock.calls[index][0]).toEqual(payload.status === ARCHIVED_FF ? [] : ['maur-2']); + index++; + } + }); }); diff --git a/src/sync/polling/updaters/splitChangesUpdater.ts b/src/sync/polling/updaters/splitChangesUpdater.ts index a2e63d0b..63ff08f0 100644 --- a/src/sync/polling/updaters/splitChangesUpdater.ts +++ b/src/sync/polling/updaters/splitChangesUpdater.ts @@ -8,7 +8,7 @@ import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/ import { ILogger } from '../../../logger/types'; import { SYNC_SPLITS_FETCH, SYNC_SPLITS_NEW, SYNC_SPLITS_REMOVED, SYNC_SPLITS_SEGMENTS, SYNC_SPLITS_FETCH_FAILS, SYNC_SPLITS_FETCH_RETRY } from '../../../logger/constants'; -type ISplitChangesUpdater = (noCache?: boolean, till?: number) => Promise +type ISplitChangesUpdater = (noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) => Promise // Checks that all registered segments have been fetched (changeNumber !== -1 for every segment). // Returns a promise that could be rejected. @@ -111,7 +111,7 @@ export function splitChangesUpdaterFactory( * @param {boolean | undefined} noCache true to revalidate data to fetch * @param {boolean | undefined} till query param to bypass CDN requests */ - return function splitChangesUpdater(noCache?: boolean, till?: number) { + return function splitChangesUpdater(noCache?: boolean, till?: number, splitUpdateNotification?: { payload: ISplit, changeNumber: number }) { /** * @param {number} since current changeNumber at splitsCache @@ -119,8 +119,10 @@ export function splitChangesUpdaterFactory( */ function _splitChangesUpdater(since: number, retry = 0): Promise { log.debug(SYNC_SPLITS_FETCH, [since]); - - const fetcherPromise = splitChangesFetcher(since, noCache, till, _promiseDecorator) + const fetcherPromise = Promise.resolve(splitUpdateNotification ? + { splits: [splitUpdateNotification.payload], till: splitUpdateNotification.changeNumber } : + splitChangesFetcher(since, noCache, till, _promiseDecorator) + ) .then((splitChanges: ISplitChangesResponse) => { startingUp = false; diff --git a/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts index 290cfa0c..eb1a25b1 100644 --- a/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/MySegmentsUpdateWorker.ts @@ -1,11 +1,13 @@ import { IMySegmentsSyncTask, MySegmentsData } from '../../polling/types'; import { Backoff } from '../../../utils/Backoff'; import { IUpdateWorker } from './types'; +import { MY_SEGMENT } from '../../../utils/constants'; +import { ITelemetryTracker } from '../../../trackers/types'; /** * MySegmentsUpdateWorker factory */ -export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask): IUpdateWorker { +export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask, telemetryTracker: ITelemetryTracker): IUpdateWorker { let maxChangeNumber = 0; // keeps the maximum changeNumber among queued events let currentChangeNumber = -1; @@ -23,8 +25,10 @@ export function MySegmentsUpdateWorker(mySegmentsSyncTask: IMySegmentsSyncTask): // fetch mySegments revalidating data if cached mySegmentsSyncTask.execute(_segmentsData, true).then((result) => { if (!isHandlingEvent) return; // halt if `stop` has been called - if (result !== false) // Unlike `Splits|SegmentsUpdateWorker`, we cannot use `mySegmentsCache.getChangeNumber` since `/mySegments` endpoint doesn't provide this value. + if (result !== false) {// Unlike `Splits|SegmentsUpdateWorker`, we cannot use `mySegmentsCache.getChangeNumber` since `/mySegments` endpoint doesn't provide this value. + if (_segmentsData) telemetryTracker.trackUpdatesFromSSE(MY_SEGMENT); currentChangeNumber = Math.max(currentChangeNumber, currentMaxChangeNumber); // use `currentMaxChangeNumber`, in case that `maxChangeNumber` was updated during fetch. + } if (handleNewEvent) { __handleMySegmentsUpdateCall(); } else { diff --git a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts index 1a1264df..e9336aba 100644 --- a/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts +++ b/src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts @@ -1,8 +1,11 @@ +import { ISplit } from '../../../dtos/types'; import { ILogger } from '../../../logger/types'; import { SDK_SPLITS_ARRIVED } from '../../../readiness/constants'; import { ISplitsEventEmitter } from '../../../readiness/types'; import { ISplitsCacheSync } from '../../../storages/types'; +import { ITelemetryTracker } from '../../../trackers/types'; import { Backoff } from '../../../utils/Backoff'; +import { SPLITS } from '../../../utils/constants'; import { ISegmentsSyncTask, ISplitsSyncTask } from '../../polling/types'; import { ISplitKillData, ISplitUpdateData } from '../SSEHandler/types'; import { FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT, FETCH_BACKOFF_MAX_RETRIES } from './constants'; @@ -11,25 +14,27 @@ import { IUpdateWorker } from './types'; /** * SplitsUpdateWorker factory */ -export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } { +export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker & { killSplit(event: ISplitKillData): void } { let maxChangeNumber = 0; let handleNewEvent = false; let isHandlingEvent: boolean; let cdnBypass: boolean; + let payload: ISplit | undefined; const backoff = new Backoff(__handleSplitUpdateCall, FETCH_BACKOFF_BASE, FETCH_BACKOFF_MAX_WAIT); function __handleSplitUpdateCall() { isHandlingEvent = true; if (maxChangeNumber > splitsCache.getChangeNumber()) { handleNewEvent = false; - + const splitUpdateNotification = payload ? { payload, changeNumber: maxChangeNumber } : undefined; // fetch splits revalidating data if cached - splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined).then(() => { + splitsSyncTask.execute(true, cdnBypass ? maxChangeNumber : undefined, splitUpdateNotification).then(() => { if (!isHandlingEvent) return; // halt if `stop` has been called if (handleNewEvent) { __handleSplitUpdateCall(); } else { + if (splitUpdateNotification) telemetryTracker.trackUpdatesFromSSE(SPLITS); // fetch new registered segments for server-side API. Not retrying on error if (segmentsSyncTask) segmentsSyncTask.execute(true); @@ -66,7 +71,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, * * @param {number} changeNumber change number of the SPLIT_UPDATE notification */ - function put({ changeNumber }: Pick) { + function put({ changeNumber, pcn }: ISplitUpdateData, _payload?: ISplit) { const currentChangeNumber = splitsCache.getChangeNumber(); if (changeNumber <= currentChangeNumber || changeNumber <= maxChangeNumber) return; @@ -74,6 +79,11 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, maxChangeNumber = changeNumber; handleNewEvent = true; cdnBypass = false; + payload = undefined; + + if (_payload && currentChangeNumber === pcn) { + payload = _payload; + } if (backoff.timeoutID || !isHandlingEvent) __handleSplitUpdateCall(); backoff.reset(); @@ -81,7 +91,6 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, return { put, - /** * Invoked by NotificationProcessor on SPLIT_KILL event * @@ -95,7 +104,7 @@ export function SplitsUpdateWorker(log: ILogger, splitsCache: ISplitsCacheSync, splitsEventEmitter.emit(SDK_SPLITS_ARRIVED, true); } // queues the SplitChanges fetch (only if changeNumber is newer) - put({ changeNumber }); + put({ changeNumber } as ISplitUpdateData); }, stop() { diff --git a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts index 94ec9b78..6726ed86 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/MySegmentsUpdateWorker.spec.ts @@ -2,6 +2,7 @@ import { MySegmentsUpdateWorker } from '../MySegmentsUpdateWorker'; import { syncTaskFactory } from '../../../syncTask'; import { Backoff } from '../../../../utils/Backoff'; +import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker'; function mySegmentsSyncTaskMock(values = []) { @@ -30,6 +31,9 @@ function mySegmentsSyncTaskMock(values = []) { }; } +const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + + describe('MySegmentsUpdateWorker', () => { afterEach(() => { @@ -41,7 +45,7 @@ describe('MySegmentsUpdateWorker', () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock(); Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); // assert calling `mySegmentsSyncTask.execute` if `isExecuting` is false expect(mySegmentsSyncTask.isExecuting()).toBe(false); @@ -109,7 +113,7 @@ describe('MySegmentsUpdateWorker', () => { // setup Backoff.__TEST__BASE_MILLIS = 50; const mySegmentsSyncTask = mySegmentsSyncTaskMock([false, false, false]); // fetch fail - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff mySegmentUpdateWorker.put(100); @@ -125,7 +129,7 @@ describe('MySegmentsUpdateWorker', () => { // setup const mySegmentsSyncTask = mySegmentsSyncTaskMock([false]); Backoff.__TEST__BASE_MILLIS = 1; - const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask); + const mySegmentUpdateWorker = MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker); mySegmentUpdateWorker.put(100); diff --git a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts index 666fc25b..af5c6336 100644 --- a/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts +++ b/src/sync/streaming/UpdateWorkers/__tests__/SplitsUpdateWorker.spec.ts @@ -6,6 +6,8 @@ import { FETCH_BACKOFF_MAX_RETRIES } from '../constants'; import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock'; import { syncTaskFactory } from '../../../syncTask'; import { Backoff } from '../../../../utils/Backoff'; +import { splitNotifications } from '../../../streaming/__tests__/dataMocks'; +import { telemetryTrackerFactory } from '../../../../trackers/telemetryTracker'; function splitsSyncTaskMock(splitStorage: SplitsCacheInMemory, changeNumbers = []) { @@ -46,6 +48,9 @@ function assertKilledSplit(cache, changeNumber, splitName, defaultTreatment) { expect(split.changeNumber).toBe(changeNumber); // split must have the given change number } +const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker + + describe('SplitsUpdateWorker', () => { afterEach(() => { // restore @@ -60,7 +65,7 @@ describe('SplitsUpdateWorker', () => { const splitsSyncTask = splitsSyncTaskMock(cache); Backoff.__TEST__BASE_MILLIS = 1; // retry immediately - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); // assert calling `splitsSyncTask.execute` if `isExecuting` is false expect(splitsSyncTask.isExecuting()).toBe(false); @@ -99,7 +104,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__BASE_MILLIS = 50; const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); // while fetch fails, should retry with backoff splitUpdateWorker.put({ changeNumber: 100 }); @@ -118,7 +123,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -126,8 +131,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('Refresh completed bypassing the CDN in 2 attempts.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined]), - [true, 100], [true, 100], + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), + [true, 100, undefined], [true, 100, undefined], ]); // `execute` was called 12 times. Last 2 with CDN bypass // Handle new event after previous is completed @@ -143,7 +148,7 @@ describe('SplitsUpdateWorker', () => { Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); // queued @@ -151,8 +156,8 @@ describe('SplitsUpdateWorker', () => { expect(loggerMock.debug).lastCalledWith('No changes fetched after 10 attempts with CDN bypassed.'); expect(splitsSyncTask.execute.mock.calls).toEqual([ - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined]), - ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, undefined, undefined]), + ...Array(FETCH_BACKOFF_MAX_RETRIES).fill([true, 100, undefined]), ]); // `execute` was called 20 times. Last 10 with CDN bypass // Handle new event after previous ends (not completed) @@ -168,7 +173,7 @@ describe('SplitsUpdateWorker', () => { cache.addSplit('lol2', '{ "name": "something else"}'); const splitsSyncTask = splitsSyncTaskMock(cache); - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, splitsEventEmitterMock, telemetryTracker); // assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new splitUpdateWorker.killSplit({ changeNumber: 100, splitName: 'lol1', defaultTreatment: 'off' }); // splitsCache.killLocally is synchronous @@ -195,7 +200,7 @@ describe('SplitsUpdateWorker', () => { const cache = new SplitsCacheInMemory(); const splitsSyncTask = splitsSyncTaskMock(cache, [95]); Backoff.__TEST__BASE_MILLIS = 1; - const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); splitUpdateWorker.put({ changeNumber: 100 }); @@ -205,4 +210,62 @@ describe('SplitsUpdateWorker', () => { expect(splitsSyncTask.execute).toBeCalledTimes(1); }); + test('put, avoid fetching if payload sent', async () => { + + const cache = new SplitsCacheInMemory(); + splitNotifications.forEach(notification => { + const pcn = cache.getChangeNumber(); + const splitsSyncTask = splitsSyncTaskMock(cache); + const splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + const payload = notification.decoded; + const changeNumber = payload.changeNumber; + splitUpdateWorker.put( { changeNumber, pcn }, payload); // queued + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {changeNumber, payload}]); + }); + }); + + test('put, ccn and pcn validation for IFF', () => { + const cache = new SplitsCacheInMemory(); + + // ccn = 103 & pcn = 104: Something was missed -> fetch split changes + let ccn = 103; + let pcn = 104; + let changeNumber = 105; + cache.setChangeNumber(ccn); + const notification = splitNotifications[0]; + + let splitsSyncTask = splitsSyncTaskMock(cache); + let splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); + splitsSyncTask.execute.mockClear(); + + // ccn = 110 & pcn = 0: Something was missed -> something wrong in `pushNotificationManager` -> fetch split changes + ccn = 110; + pcn = 0; + changeNumber = 111; + cache.setChangeNumber(ccn); + + splitsSyncTask = splitsSyncTaskMock(cache); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]); + splitsSyncTask.execute.mockClear(); + + // ccn = 120 & pcn = 120: In order consecutive notifications arrived, apply updates normaly + ccn = 120; + pcn = 120; + changeNumber = 121; + cache.setChangeNumber(ccn); + + splitsSyncTask = splitsSyncTaskMock(cache); + splitUpdateWorker = SplitsUpdateWorker(loggerMock, cache, splitsSyncTask, telemetryTracker); + splitUpdateWorker.put({ changeNumber, pcn }, notification.decoded); + expect(splitsSyncTask.execute).toBeCalledTimes(1); + expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, {payload: notification.decoded, changeNumber }]); + + }); }); diff --git a/src/sync/streaming/pushManager.ts b/src/sync/streaming/pushManager.ts index f21b67a1..d6e6c7c8 100644 --- a/src/sync/streaming/pushManager.ts +++ b/src/sync/streaming/pushManager.ts @@ -12,9 +12,9 @@ import { forOwn } from '../../utils/lang'; import { SSEClient } from './SSEClient'; import { getMatching } from '../../utils/key'; import { MY_SEGMENTS_UPDATE, MY_SEGMENTS_UPDATE_V2, PUSH_NONRETRYABLE_ERROR, PUSH_SUBSYSTEM_DOWN, SECONDS_BEFORE_EXPIRATION, SEGMENT_UPDATE, SPLIT_KILL, SPLIT_UPDATE, PUSH_RETRYABLE_ERROR, PUSH_SUBSYSTEM_UP, ControlType } from './constants'; -import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2 } from '../../logger/constants'; +import { STREAMING_FALLBACK, STREAMING_REFRESH_TOKEN, STREAMING_CONNECTING, STREAMING_DISABLED, ERROR_STREAMING_AUTH, STREAMING_DISCONNECTING, STREAMING_RECONNECT, STREAMING_PARSING_MY_SEGMENTS_UPDATE_V2, STREAMING_PARSING_SPLIT_UPDATE } from '../../logger/constants'; import { KeyList, UpdateStrategy } from './SSEHandler/types'; -import { isInBitmap, parseBitmap, parseKeyList } from './parseUtils'; +import { isInBitmap, parseBitmap, parseFFUpdatePayload, parseKeyList } from './parseUtils'; import { ISet, _Set } from '../../utils/lang/sets'; import { Hash64, hash64 } from '../../utils/murmur3/murmur3_64'; import { IAuthTokenPushEnabled } from './AuthClient/types'; @@ -58,7 +58,7 @@ export function pushManagerFactory( // MySegmentsUpdateWorker (client-side) are initiated in `add` method const segmentsUpdateWorker = userKey ? undefined : SegmentsUpdateWorker(log, pollingManager.segmentsSyncTask as ISegmentsSyncTask, storage.segments); // For server-side we pass the segmentsSyncTask, used by SplitsUpdateWorker to fetch new segments - const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask); + const splitsUpdateWorker = SplitsUpdateWorker(log, storage.splits, pollingManager.splitsSyncTask, readiness.splits, telemetryTracker, userKey ? undefined : pollingManager.segmentsSyncTask as ISegmentsSyncTask); // [Only for client-side] map of hashes to user keys, to dispatch MY_SEGMENTS_UPDATE events to the corresponding MySegmentsUpdateWorker const userKeyHashes: Record = {}; @@ -221,7 +221,20 @@ export function pushManagerFactory( /** Functions related to synchronization (Queues and Workers in the spec) */ pushEmitter.on(SPLIT_KILL, splitsUpdateWorker.killSplit); - pushEmitter.on(SPLIT_UPDATE, splitsUpdateWorker.put); + pushEmitter.on(SPLIT_UPDATE, (parsedData) => { + if (parsedData.d && parsedData.c !== undefined) { + try { + const payload = parseFFUpdatePayload(parsedData.c, parsedData.d); + if (payload) { + splitsUpdateWorker.put(parsedData, payload); + return; + } + } catch (e) { + log.warn(STREAMING_PARSING_SPLIT_UPDATE, [e]); + } + } + splitsUpdateWorker.put(parsedData); + }); if (userKey) { pushEmitter.on(MY_SEGMENTS_UPDATE, function handleMySegmentsUpdate(parsedData, channel) { @@ -330,7 +343,7 @@ export function pushManagerFactory( if (!userKeyHashes[hash]) { userKeyHashes[hash] = userKey; - clients[userKey] = { hash64: hash64(userKey), worker: MySegmentsUpdateWorker(mySegmentsSyncTask) }; + clients[userKey] = { hash64: hash64(userKey), worker: MySegmentsUpdateWorker(mySegmentsSyncTask, telemetryTracker) }; connectForNewClient = true; // we must reconnect on start, to listen the channel for the new user key // Reconnects in case of a new client. diff --git a/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts b/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts index 25ff16be..be7b6542 100644 --- a/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts +++ b/src/sync/submitters/__tests__/telemetrySubmitter.spec.ts @@ -48,7 +48,7 @@ describe('Telemetry submitter', () => { expect(isEmptySpy).toBeCalledTimes(1); expect(popSpy).toBeCalledTimes(1); expect(postMetricsUsage).toBeCalledWith(JSON.stringify({ - lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: 0, seC: 0, skC: 0, eQ: 0, eD: 0, sE: [], t: ['tag1'] + lS: {}, mL: {}, mE: {}, hE: {}, hL: {}, tR: 0, aR: 0, iQ: 0, iDe: 0, iDr: 0, spC: 0, seC: 0, skC: 0, eQ: 0, eD: 0, sE: [], t: ['tag1'], ufs:{ sp: 0, ms: 0 } })); // Await second periodic execution diff --git a/src/sync/submitters/types.ts b/src/sync/submitters/types.ts index 6eff14a0..95653fec 100644 --- a/src/sync/submitters/types.ts +++ b/src/sync/submitters/types.ts @@ -103,6 +103,7 @@ export type DROPPED = 1; export type DEDUPED = 2; export type ImpressionDataType = QUEUED | DROPPED | DEDUPED export type EventDataType = QUEUED | DROPPED; +export type UpdatesFromSSEEnum = SPLITS | MY_SEGMENT; export type SPLITS = 'sp'; export type IMPRESSIONS = 'im'; @@ -151,6 +152,12 @@ export type TelemetryUsageStats = { mE?: MethodExceptions, // methodExceptions } +// amount of instant updates that we are doing by avoiding fetching to Split servers +export type UpdatesFromSSE = { + sp: number, // splits + ms?: number, // my segments +} + // 'metrics/usage' JSON request body export type TelemetryUsageStatsPayload = TelemetryUsageStats & { lS: LastSync, // lastSynchronization @@ -169,6 +176,7 @@ export type TelemetryUsageStatsPayload = TelemetryUsageStats & { eD: number, // eventsDropped sE: Array, // streamingEvents t?: Array, // tags + ufs?: UpdatesFromSSE, //UpdatesFromSSE } /** diff --git a/src/trackers/telemetryTracker.ts b/src/trackers/telemetryTracker.ts index 531381a3..0312cc94 100644 --- a/src/trackers/telemetryTracker.ts +++ b/src/trackers/telemetryTracker.ts @@ -3,6 +3,7 @@ import { EXCEPTION, SDK_NOT_READY } from '../utils/labels'; import { ITelemetryTracker } from './types'; import { timer } from '../utils/timeTracker/timer'; import { TOKEN_REFRESH, AUTH_REJECTION } from '../utils/constants'; +import { UpdatesFromSSEEnum } from '../sync/submitters/types'; export function telemetryTrackerFactory( telemetryCache?: ITelemetryCacheSync | ITelemetryCacheAsync, @@ -52,6 +53,9 @@ export function telemetryTrackerFactory( addTag(tag: string) { // @ts-ignore if (telemetryCache.addTag) telemetryCache.addTag(tag); + }, + trackUpdatesFromSSE(type: UpdatesFromSSEEnum) { + (telemetryCache as ITelemetryCacheSync).recordUpdatesFromSSE(type); } }; @@ -62,7 +66,8 @@ export function telemetryTrackerFactory( trackHttp: noopTrack, sessionLength() { }, streamingEvent() { }, - addTag() { } + addTag() { }, + trackUpdatesFromSSE() { }, }; } } diff --git a/src/trackers/types.ts b/src/trackers/types.ts index 26648934..041c9b97 100644 --- a/src/trackers/types.ts +++ b/src/trackers/types.ts @@ -1,5 +1,5 @@ import { SplitIO, ImpressionDTO } from '../types'; -import { StreamingEventType, Method, OperationType } from '../sync/submitters/types'; +import { StreamingEventType, Method, OperationType, UpdatesFromSSEEnum } from '../sync/submitters/types'; import { IEventsCacheBase } from '../storages/types'; import { NetworkError } from '../services/types'; @@ -45,6 +45,10 @@ export interface ITelemetryTracker { * Records tag */ addTag(tag: string): void + /** + * Records updates from sse + */ + trackUpdatesFromSSE(type: UpdatesFromSSEEnum): void; } export interface IFilterAdapter {