Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: add init method and onReadyFromCacheCb param to storage factory [WIP] #352

Draft
wants to merge 6 commits into
base: development
Choose a base branch
from
3 changes: 2 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
2.0.0 (October XX, 2024)
- Added support for targeting rules based on large segments.
- Added `factory.destroy()` method, which invokes the `destroy` method on all SDK clients created by the factory.
- Updated the handling of timers and async operations inside an `init` factory method to enable lazy initialization of the SDK in standalone mode. This update is intended for the React SDK.
- Updated internal storage factory to emit the SDK_READY_FROM_CACHE event when it corresponds, to clean up the initialization flow.
- Updated the handling of timers and async operations inside an `init` factory method to enable lazy initialization of the SDK. This update is intended for the React SDK.
- Bugfixing - Fixed an issue with the server-side polling manager that caused dangling timers when the SDK was destroyed before it was ready.
- BREAKING CHANGES:
- Updated default flag spec version to 1.2.
Expand Down
6 changes: 5 additions & 1 deletion src/sdkFactory/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { IBasicClient, SplitIO } from '../types';
import { validateAndTrackApiKey } from '../utils/inputValidation/apiKey';
import { createLoggerAPI } from '../logger/sdkLogger';
import { NEW_FACTORY, RETRIEVE_MANAGER } from '../logger/constants';
import { SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED } from '../readiness/constants';
import { SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../readiness/constants';
import { objectAssign } from '../utils/lang/objectAssign';
import { strategyDebugFactory } from '../trackers/strategy/strategyDebug';
import { strategyOptimizedFactory } from '../trackers/strategy/strategyOptimized';
Expand Down Expand Up @@ -52,6 +52,9 @@ export function sdkFactory(params: ISdkFactoryParams): SplitIO.ICsSDK | SplitIO.
readiness.splits.emit(SDK_SPLITS_ARRIVED);
readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
},
onReadyFromCacheCb: () => {
readiness.splits.emit(SDK_SPLITS_CACHE_LOADED);
}
});
// @TODO add support for dataloader: `if (params.dataLoader) params.dataLoader(storage);`
const clients: Record<string, IBasicClient> = {};
Expand Down Expand Up @@ -99,6 +102,7 @@ export function sdkFactory(params: ISdkFactoryParams): SplitIO.ICsSDK | SplitIO.
// We will just log and allow for the SDK to end up throwing an SDK_TIMEOUT event for devs to handle.
validateAndTrackApiKey(log, settings.core.authorizationKey);
readiness.init();
storage.init && storage.init();
uniqueKeysTracker && uniqueKeysTracker.start();
syncManager && syncManager.start();
signalListener && signalListener.start();
Expand Down
8 changes: 0 additions & 8 deletions src/storages/AbstractSplitsCacheAsync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@ export abstract class AbstractSplitsCacheAsync implements ISplitsCacheAsync {
return Promise.resolve(true);
}

/**
* Check if the splits information is already stored in cache.
* Noop, just keeping the interface. This is used by client-side implementations only.
*/
checkCache(): Promise<boolean> {
return Promise.resolve(false);
}

/**
* Kill `name` split and set `defaultTreatment` and `changeNumber`.
* Used for SPLIT_KILL push notifications.
Expand Down
8 changes: 0 additions & 8 deletions src/storages/AbstractSplitsCacheSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ export abstract class AbstractSplitsCacheSync implements ISplitsCacheSync {

abstract clear(): void

/**
* Check if the splits information is already stored in cache. This data can be preloaded.
* It is used as condition to emit SDK_SPLITS_CACHE_LOADED, and then SDK_READY_FROM_CACHE.
*/
checkCache(): boolean {
return false;
}

/**
* Kill `name` split and set `defaultTreatment` and `changeNumber`.
* Used for SPLIT_KILL push notifications.
Expand Down
11 changes: 1 addition & 10 deletions src/storages/inLocalStorage/SplitsCacheInLocal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,6 @@ export class SplitsCacheInLocal extends AbstractSplitsCacheSync {
}
}

/**
* Check if the splits information is already stored in browser LocalStorage.
* In this function we could add more code to check if the data is valid.
* @override
*/
checkCache(): boolean {
return this.getChangeNumber() > -1;
}

/**
* Clean Splits cache if its `lastUpdated` timestamp is older than the given `expirationTimestamp`,
*
Expand All @@ -250,7 +241,7 @@ export class SplitsCacheInLocal extends AbstractSplitsCacheSync {
this.updateNewFilter = true;

// if there is cache, clear it
if (this.checkCache()) this.clear();
if (this.getChangeNumber() > -1) this.clear();

} catch (e) {
this.log.error(LOG_PREFIX + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,10 @@ test('SPLIT CACHE / LocalStorage', () => {
expect(cache.getSplit('lol1')).toEqual(null);
expect(cache.getSplit('lol2')).toEqual(somethingElse);

expect(cache.checkCache()).toBe(false); // checkCache should return false until localstorage has data.

expect(cache.getChangeNumber() === -1).toBe(true);

expect(cache.checkCache()).toBe(false); // checkCache should return false until localstorage has data.

cache.setChangeNumber(123);

expect(cache.checkCache()).toBe(true); // checkCache should return true once localstorage has data.

expect(cache.getChangeNumber() === 123).toBe(true);

});
Expand Down
32 changes: 25 additions & 7 deletions src/storages/inLocalStorage/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { ImpressionsCacheInMemory } from '../inMemory/ImpressionsCacheInMemory';
import { ImpressionCountsCacheInMemory } from '../inMemory/ImpressionCountsCacheInMemory';
import { EventsCacheInMemory } from '../inMemory/EventsCacheInMemory';
import { IStorageFactoryParams, IStorageSync, IStorageSyncFactory } from '../types';
import { ISegmentsCacheSync, ISplitsCacheSync, IStorageFactoryParams, IStorageSync, IStorageSyncFactory } from '../types';
import { validatePrefix } from '../KeyBuilder';
import { KeyBuilderCS, myLargeSegmentsKeyBuilder } from '../KeyBuilderCS';
import { isLocalStorageAvailable } from '../../utils/env/isLocalStorageAvailable';
Expand All @@ -12,7 +12,7 @@
import { DEFAULT_CACHE_EXPIRATION_IN_MILLIS } from '../../utils/constants/browser';
import { InMemoryStorageCSFactory } from '../inMemory/InMemoryStorageCS';
import { LOG_PREFIX } from './constants';
import { DEBUG, NONE, STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { DEBUG, LOCALHOST_MODE, NONE, STORAGE_LOCALSTORAGE } from '../../utils/constants';
import { shouldRecordTelemetry, TelemetryCacheInMemory } from '../inMemory/TelemetryCacheInMemory';
import { UniqueKeysCacheInMemoryCS } from '../inMemory/UniqueKeysCacheInMemoryCS';
import { getMatching } from '../../utils/key';
Expand All @@ -36,16 +36,16 @@
return InMemoryStorageCSFactory(params);
}

const { settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params;
const { onReadyFromCacheCb, settings, settings: { log, scheduler: { impressionsQueueSize, eventsQueueSize, }, sync: { impressionsMode, __splitFiltersValidation } } } = params;
const matchingKey = getMatching(settings.core.key);
const keys = new KeyBuilderCS(prefix, matchingKey);
const expirationTimestamp = Date.now() - DEFAULT_CACHE_EXPIRATION_IN_MILLIS;

const splits = new SplitsCacheInLocal(settings, keys, expirationTimestamp);
const segments = new MySegmentsCacheInLocal(log, keys);
const largeSegments = new MySegmentsCacheInLocal(log, myLargeSegmentsKeyBuilder(prefix, matchingKey));
const splits: ISplitsCacheSync = new SplitsCacheInLocal(settings, keys, expirationTimestamp);
const segments: ISegmentsCacheSync = new MySegmentsCacheInLocal(log, keys);
const largeSegments: ISegmentsCacheSync = new MySegmentsCacheInLocal(log, myLargeSegmentsKeyBuilder(prefix, matchingKey));

return {
const storage = {
splits,
segments,
largeSegments,
Expand All @@ -55,6 +55,12 @@
telemetry: shouldRecordTelemetry(params) ? new TelemetryCacheInMemory(splits, segments) : undefined,
uniqueKeys: impressionsMode === NONE ? new UniqueKeysCacheInMemoryCS() : undefined,

init() {
if (settings.mode === LOCALHOST_MODE || splits.getChangeNumber() > -1) {
Promise.resolve().then(onReadyFromCacheCb);
}
},

destroy() {
this.splits = new SplitsCacheInMemory(__splitFiltersValidation);
this.segments = new MySegmentsCacheInMemory();
Expand All @@ -79,12 +85,24 @@

destroy() {
this.splits = new SplitsCacheInMemory(__splitFiltersValidation);
this.segments = new MySegmentsCacheInMemory();

Check failure on line 88 in src/storages/inLocalStorage/index.ts

View workflow job for this annotation

GitHub Actions / CI

Type 'MySegmentsCacheInMemory' is missing the following properties from type 'MySegmentsCacheInLocal': keys, log
this.largeSegments = new MySegmentsCacheInMemory();

Check failure on line 89 in src/storages/inLocalStorage/index.ts

View workflow job for this annotation

GitHub Actions / CI

Type 'MySegmentsCacheInMemory' is not assignable to type 'MySegmentsCacheInLocal'.
}
};
},
};

// @TODO revisit storage logic in localhost mode
// No tracking data in localhost mode to avoid memory leaks
if (params.settings.mode === LOCALHOST_MODE) {
const noopTrack = () => true;
storage.impressions.track = noopTrack;
storage.events.track = noopTrack;
if (storage.impressionCounts) storage.impressionCounts.track = noopTrack;
if (storage.uniqueKeys) storage.uniqueKeys.track = noopTrack;
}

return storage;
}

InLocalStorageCSFactory.type = STORAGE_LOCALSTORAGE;
Expand Down
2 changes: 1 addition & 1 deletion src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const DEFAULT_OPTIONS = {
const DEFAULT_LIBRARY_OPTIONS = {
enableOfflineQueue: false,
connectTimeout: DEFAULT_OPTIONS.connectionTimeout,
lazyConnect: false
lazyConnect: false // @TODO true to avoid side-effects on instantiation
};

interface IRedisCommand {
Expand Down
4 changes: 4 additions & 0 deletions src/storages/pluggable/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ describe('PLUGGABLE STORAGE', () => {
test('creates a storage instance', async () => {
const storageFactory = PluggableStorage({ prefix, wrapper: wrapperMock });
const storage = storageFactory(internalSdkParams);
storage.init();

assertStorageInterface(storage); // the instance must implement the storage interface
expect(wrapperMock.connect).toBeCalledTimes(1); // wrapper connect method should be called once when storage is created
Expand Down Expand Up @@ -74,6 +75,7 @@ describe('PLUGGABLE STORAGE', () => {
test('creates a storage instance for partial consumer mode (events and impressions cache in memory)', async () => {
const storageFactory = PluggableStorage({ prefix, wrapper: wrapperMock });
const storage = storageFactory({ ...internalSdkParams, settings: { ...internalSdkParams.settings, mode: CONSUMER_PARTIAL_MODE } });
storage.init();

assertStorageInterface(storage);
expect(wrapperMock.connect).toBeCalledTimes(1);
Expand Down Expand Up @@ -102,6 +104,7 @@ describe('PLUGGABLE STORAGE', () => {
// Create storage instance. Wrapper is pollute but doesn't have filter query key, so it should clear the cache
await new Promise(resolve => {
storage = storageFactory({ onReadyCb: resolve, settings: { ...fullSettings, mode: undefined } });
storage.init();
});

// Assert that expected caches are present
Expand All @@ -121,6 +124,7 @@ describe('PLUGGABLE STORAGE', () => {
// Create storage instance. This time the wrapper has the current filter query key, so it should not clear the cache
await new Promise(resolve => {
storage = storageFactory({ onReadyCb: resolve, settings: { ...fullSettings, mode: undefined } });
storage.init();
});

// Assert that cache was not cleared
Expand Down
71 changes: 38 additions & 33 deletions src/storages/pluggable/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IPluggableStorageWrapper, IStorageAsync, IStorageAsyncFactory, IStorageFactoryParams, ITelemetryCacheAsync } from '../types';
import { IPluggableStorageWrapper, IStorageAsyncFactory, IStorageFactoryParams, ITelemetryCacheAsync } from '../types';

import { KeyBuilderSS } from '../KeyBuilderSS';
import { SplitsCachePluggable } from './SplitsCachePluggable';
Expand Down Expand Up @@ -62,11 +62,12 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn

const prefix = validatePrefix(options.prefix);

function PluggableStorageFactory(params: IStorageFactoryParams): IStorageAsync {
function PluggableStorageFactory(params: IStorageFactoryParams) {
const { onReadyCb, settings, settings: { log, mode, sync: { impressionsMode }, scheduler: { impressionsQueueSize, eventsQueueSize } } } = params;
const metadata = metadataBuilder(settings);
const keys = new KeyBuilderSS(prefix, metadata);
const wrapper = wrapperAdapter(log, options.wrapper);
let connectPromise: Promise<void>;

const isSyncronizer = mode === undefined; // If mode is not defined, the synchronizer is running
const isPartialConsumer = mode === CONSUMER_PARTIAL_MODE;
Expand All @@ -89,35 +90,6 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
new UniqueKeysCachePluggable(log, keys.buildUniqueKeysKey(), wrapper) :
undefined;

// Connects to wrapper and emits SDK_READY event on main client
const connectPromise = wrapper.connect().then(() => {
if (isSyncronizer) {
// In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
if (hash !== currentHash) {
log.info(LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache');
return wrapper.getKeysByPrefix(`${keys.prefix}.`).then(storageKeys => {
return Promise.all(storageKeys.map(storageKey => wrapper.del(storageKey)));
}).then(() => wrapper.set(keys.buildHashKey(), currentHash));
}
}).then(() => {
onReadyCb();
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
}
}).catch((e) => {
e = e || new Error('Error connecting wrapper');
onReadyCb(e);
return e; // Propagate error for shared clients
});

return {
splits: new SplitsCachePluggable(log, keys, wrapper, settings.sync.__splitFiltersValidation),
segments: new SegmentsCachePluggable(log, keys, wrapper),
Expand All @@ -127,6 +99,39 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
telemetry,
uniqueKeys: uniqueKeysCache,

init() {
if (connectPromise) return connectPromise;

// Connects to wrapper and emits SDK_READY event on main client
return connectPromise = wrapper.connect().then(() => {
if (isSyncronizer) {
// In standalone or producer mode, clear storage if SDK key or feature flag filter has changed
return wrapper.get(keys.buildHashKey()).then((hash) => {
const currentHash = getStorageHash(settings);
if (hash !== currentHash) {
log.info(LOG_PREFIX + 'Storage HASH has changed (SDK key, flags filter criteria or flags spec version was modified). Clearing cache');
return wrapper.getKeysByPrefix(`${keys.prefix}.`).then(storageKeys => {
return Promise.all(storageKeys.map(storageKey => wrapper.del(storageKey)));
}).then(() => wrapper.set(keys.buildHashKey(), currentHash));
}
}).then(() => {
onReadyCb();
});
} else {
// Start periodic flush of async storages if not running synchronizer (producer mode)
if (impressionCountsCache && (impressionCountsCache as ImpressionCountsCachePluggable).start) (impressionCountsCache as ImpressionCountsCachePluggable).start();
if (uniqueKeysCache && (uniqueKeysCache as UniqueKeysCachePluggable).start) (uniqueKeysCache as UniqueKeysCachePluggable).start();
if (telemetry && (telemetry as ITelemetryCacheAsync).recordConfig) (telemetry as ITelemetryCacheAsync).recordConfig();

onReadyCb();
}
}).catch((e) => {
e = e || new Error('Error connecting wrapper');
onReadyCb(e);
return e; // Propagate error for shared clients
});
},

// Stop periodic flush and disconnect the underlying storage
destroy() {
return Promise.all(isSyncronizer ? [] : [
Expand All @@ -136,8 +141,8 @@ export function PluggableStorage(options: PluggableStorageOptions): IStorageAsyn
},

// emits SDK_READY event on shared clients and returns a reference to the storage
shared(_, onReadyCb) {
connectPromise.then(onReadyCb);
shared(_: string, onReadyCb: (error?: any) => void) {
this.init().then(onReadyCb);

return {
...this,
Expand Down
6 changes: 2 additions & 4 deletions src/storages/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ export interface ISplitsCacheBase {
// only for Client-Side. Returns true if the storage is not synchronized yet (getChangeNumber() === -1) or contains a FF using segments or large segments
usesSegments(): MaybeThenable<boolean>,
clear(): MaybeThenable<boolean | void>,
// should never reject or throw an exception. Instead return false by default, to avoid emitting SDK_READY_FROM_CACHE.
checkCache(): MaybeThenable<boolean>,
killLocally(name: string, defaultTreatment: string, changeNumber: number): MaybeThenable<boolean>,
getNamesByFlagSets(flagSets: string[]): MaybeThenable<Set<string>[]>
}
Expand All @@ -225,7 +223,6 @@ export interface ISplitsCacheSync extends ISplitsCacheBase {
trafficTypeExists(trafficType: string): boolean,
usesSegments(): boolean,
clear(): void,
checkCache(): boolean,
killLocally(name: string, defaultTreatment: string, changeNumber: number): boolean,
getNamesByFlagSets(flagSets: string[]): Set<string>[]
}
Expand All @@ -242,7 +239,6 @@ export interface ISplitsCacheAsync extends ISplitsCacheBase {
trafficTypeExists(trafficType: string): Promise<boolean>,
usesSegments(): Promise<boolean>,
clear(): Promise<boolean | void>,
checkCache(): Promise<boolean>,
killLocally(name: string, defaultTreatment: string, changeNumber: number): Promise<boolean>,
getNamesByFlagSets(flagSets: string[]): Promise<Set<string>[]>
}
Expand Down Expand Up @@ -459,6 +455,7 @@ export interface IStorageBase<
events: TEventsCache,
telemetry?: TTelemetryCache,
uniqueKeys?: TUniqueKeysCache,
init?: () => void | Promise<void>,
destroy(): void | Promise<void>,
shared?: (matchingKey: string, onReadyCb: (error?: any) => void) => this
}
Expand Down Expand Up @@ -497,6 +494,7 @@ export interface IStorageFactoryParams {
* It is meant for emitting SDK_READY event in consumer mode, and waiting before using the storage in the synchronizer.
*/
onReadyCb: (error?: any) => void,
onReadyFromCacheCb: (error?: any) => void,
}

export type StorageType = 'MEMORY' | 'LOCALSTORAGE' | 'REDIS' | 'PLUGGABLE';
Expand Down
10 changes: 3 additions & 7 deletions src/sync/offline/syncTasks/fromObjectSyncTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { syncTaskFactory } from '../../syncTask';
import { ISyncTask } from '../../types';
import { ISettings } from '../../../types';
import { CONTROL } from '../../../utils/constants';
import { SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED, SDK_SPLITS_CACHE_LOADED } from '../../../readiness/constants';
import { SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants';
import { SYNC_OFFLINE_DATA, ERROR_SYNC_OFFLINE_LOADING } from '../../../logger/constants';

/**
Expand Down Expand Up @@ -60,12 +60,8 @@ export function fromObjectUpdaterFactory(

if (startingUp) {
startingUp = false;
Promise.resolve(splitsCache.checkCache()).then(cacheReady => {
// Emits SDK_READY_FROM_CACHE
if (cacheReady) readiness.splits.emit(SDK_SPLITS_CACHE_LOADED);
// Emits SDK_READY
readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
});
// Emits SDK_READY
readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
}
return true;
});
Expand Down
Loading
Loading