Skip to content

Commit

Permalink
Update Redis storages to use the wrapped pipeline method. Update test…
Browse files Browse the repository at this point in the history
…s to provide the RedisAdapter to Redis caches, in order to make the validation more robust
  • Loading branch information
EmilianoSanchez committed Nov 29, 2023
1 parent 8bf298b commit d5202b1
Show file tree
Hide file tree
Showing 19 changed files with 94 additions and 107 deletions.
2 changes: 1 addition & 1 deletion src/readiness/__tests__/sdkReadinessManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe('SDK Readiness Manager - Event emitter', () => {
expect(sdkStatus[propName]).toBeTruthy(); // The sdkStatus exposes all minimal EventEmitter functionality.
});

expect(typeof sdkStatus['ready']).toBe('function'); // The sdkStatus exposes a .ready() function.
expect(typeof sdkStatus.ready).toBe('function'); // The sdkStatus exposes a .ready() function.

expect(typeof sdkStatus.Event).toBe('object'); // It also exposes the Event map,
expect(sdkStatus.Event.SDK_READY).toBe(SDK_READY); // which contains the constants for the events, for backwards compatibility.
Expand Down
6 changes: 3 additions & 3 deletions src/sdkManager/__tests__/index.asyncCache.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import Redis from 'ioredis';
import splitObject from './mocks/input.json';
import splitView from './mocks/output.json';
import { sdkManagerFactory } from '../index';
Expand All @@ -9,6 +8,7 @@ import { KeyBuilderSS } from '../../storages/KeyBuilderSS';
import { ISdkReadinessManager } from '../../readiness/types';
import { loggerMock } from '../../logger/__tests__/sdkLogger.mock';
import { metadata } from '../../storages/__tests__/KeyBuilder.spec';
import { RedisAdapter } from '../../storages/inRedis/RedisAdapter';

// @ts-expect-error
const sdkReadinessManagerMock = {
Expand All @@ -28,7 +28,7 @@ describe('MANAGER API', () => {
test('Async cache (In Redis)', async () => {

/** Setup: create manager */
const connection = new Redis({});
const connection = new RedisAdapter(loggerMock);
const cache = new SplitsCacheInRedis(loggerMock, keys, connection);
const manager = sdkManagerFactory(loggerMock, cache, sdkReadinessManagerMock);
await cache.clear();
Expand Down Expand Up @@ -70,7 +70,7 @@ describe('MANAGER API', () => {

/** Teardown */
await cache.removeSplit(splitObject.name);
await connection.quit();
await connection.disconnect();
});

test('Async cache with error', async () => {
Expand Down
6 changes: 3 additions & 3 deletions src/storages/inRedis/EventsCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { IEventsCacheAsync } from '../types';
import { IMetadata } from '../../dtos/types';
import { Redis } from 'ioredis';
import { SplitIO } from '../../types';
import { ILogger } from '../../logger/types';
import { LOG_PREFIX } from './constants';
import { StoredEventWithMetadata } from '../../sync/submitters/types';
import type { RedisAdapter } from './RedisAdapter';

export class EventsCacheInRedis implements IEventsCacheAsync {

private readonly log: ILogger;
private readonly key: string;
private readonly redis: Redis;
private readonly redis: RedisAdapter;
private readonly metadata: IMetadata;

constructor(log: ILogger, key: string, redis: Redis, metadata: IMetadata) {
constructor(log: ILogger, key: string, redis: RedisAdapter, metadata: IMetadata) {
this.log = log;
this.key = key;
this.redis = redis;
Expand Down
13 changes: 5 additions & 8 deletions src/storages/inRedis/ImpressionCountsCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { Redis } from 'ioredis';
import { ILogger } from '../../logger/types';
import { ImpressionCountsPayload } from '../../sync/submitters/types';
import { forOwn } from '../../utils/lang';
import { ImpressionCountsCacheInMemory } from '../inMemory/ImpressionCountsCacheInMemory';
import { LOG_PREFIX, REFRESH_RATE, TTL_REFRESH } from './constants';
import type { RedisAdapter } from './RedisAdapter';

export class ImpressionCountsCacheInRedis extends ImpressionCountsCacheInMemory {

private readonly log: ILogger;
private readonly key: string;
private readonly redis: Redis;
private readonly redis: RedisAdapter;
private readonly refreshRate: number;
private intervalId: any;

constructor(log: ILogger, key: string, redis: Redis, impressionCountsCacheSize?: number, refreshRate = REFRESH_RATE) {
constructor(log: ILogger, key: string, redis: RedisAdapter, impressionCountsCacheSize?: number, refreshRate = REFRESH_RATE) {
super(impressionCountsCacheSize);
this.log = log;
this.key = key;
Expand All @@ -27,11 +27,8 @@ export class ImpressionCountsCacheInRedis extends ImpressionCountsCacheInMemory
const keys = Object.keys(counts);
if (!keys.length) return Promise.resolve(false);

const pipeline = this.redis.pipeline();
keys.forEach(key => {
pipeline.hincrby(this.key, key, counts[key]);
});
return pipeline.exec()
// @ts-ignore
return this.redis.pipelineExec(keys.map(key => ['hincrby', this.key, key, counts[key]]))
.then(data => {
// If this is the creation of the key on Redis, set the expiration for it in 3600 seconds.
if (data.length && data.length === keys.length) {
Expand Down
6 changes: 3 additions & 3 deletions src/storages/inRedis/ImpressionsCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { IImpressionsCacheAsync } from '../types';
import { IMetadata } from '../../dtos/types';
import { ImpressionDTO } from '../../types';
import { Redis } from 'ioredis';
import { StoredImpressionWithMetadata } from '../../sync/submitters/types';
import { ILogger } from '../../logger/types';
import { impressionsToJSON } from '../utils';
import type { RedisAdapter } from './RedisAdapter';

const IMPRESSIONS_TTL_REFRESH = 3600; // 1 hr

export class ImpressionsCacheInRedis implements IImpressionsCacheAsync {

private readonly log: ILogger;
private readonly key: string;
private readonly redis: Redis;
private readonly redis: RedisAdapter;
private readonly metadata: IMetadata;

constructor(log: ILogger, key: string, redis: Redis, metadata: IMetadata) {
constructor(log: ILogger, key: string, redis: RedisAdapter, metadata: IMetadata) {
this.log = log;
this.key = key;
this.redis = redis;
Expand Down
6 changes: 3 additions & 3 deletions src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { timeout } from '../../utils/promise/timeout';
const LOG_PREFIX = 'storage:redis-adapter: ';

// If we ever decide to fully wrap every method, there's a Commander.getBuiltinCommands from ioredis.
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset', 'pipelineExec'];
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset', 'hincrby', 'popNRaw', 'flushdb', 'pipelineExec'];

// Not part of the settings since it'll vary on each storage. We should be removing storage specific logic from elsewhere.
const DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -38,7 +38,7 @@ export class RedisAdapter extends ioredis {
private _notReadyCommandsQueue?: IRedisCommand[];
private _runningCommands: ISet<Promise<any>>;

constructor(log: ILogger, storageSettings: Record<string, any>) {
constructor(log: ILogger, storageSettings?: Record<string, any>) {
const options = RedisAdapter._defineOptions(storageSettings);
// Call the ioredis constructor
super(...RedisAdapter._defineLibrarySettings(options));
Expand Down Expand Up @@ -182,7 +182,7 @@ export class RedisAdapter extends ioredis {
/**
* Parses the options into what we care about.
*/
static _defineOptions({ connectionTimeout, operationTimeout, url, host, port, db, pass, tls }: Record<string, any>) {
static _defineOptions({ connectionTimeout, operationTimeout, url, host, port, db, pass, tls }: Record<string, any> = {}) {
const parsedOptions = {
connectionTimeout, operationTimeout, url, host, port, db, pass, tls
};
Expand Down
6 changes: 3 additions & 3 deletions src/storages/inRedis/SegmentsCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { Redis } from 'ioredis';
import { ILogger } from '../../logger/types';
import { isNaNNumber } from '../../utils/lang';
import { LOG_PREFIX } from '../inLocalStorage/constants';
import { KeyBuilderSS } from '../KeyBuilderSS';
import { ISegmentsCacheAsync } from '../types';
import type { RedisAdapter } from './RedisAdapter';

export class SegmentsCacheInRedis implements ISegmentsCacheAsync {

private readonly log: ILogger;
private readonly redis: Redis;
private readonly redis: RedisAdapter;
private readonly keys: KeyBuilderSS;

constructor(log: ILogger, keys: KeyBuilderSS, redis: Redis) {
constructor(log: ILogger, keys: KeyBuilderSS, redis: RedisAdapter) {
this.log = log;
this.redis = redis;
this.keys = keys;
Expand Down
10 changes: 5 additions & 5 deletions src/storages/inRedis/SplitsCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { isFiniteNumber, isNaNNumber } from '../../utils/lang';
import { KeyBuilderSS } from '../KeyBuilderSS';
import { Redis } from 'ioredis';
import { ILogger } from '../../logger/types';
import { LOG_PREFIX } from './constants';
import { ISplit, ISplitFiltersValidation } from '../../dtos/types';
import { AbstractSplitsCacheAsync } from '../AbstractSplitsCacheAsync';
import { ISet, _Set, returnListDifference } from '../../utils/lang/sets';
import type { RedisAdapter } from './RedisAdapter';

/**
* Discard errors for an answer of multiple operations.
Expand All @@ -24,12 +24,12 @@ function processPipelineAnswer(results: Array<[Error | null, string]>): string[]
export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {

private readonly log: ILogger;
private readonly redis: Redis;
private readonly redis: RedisAdapter;
private readonly keys: KeyBuilderSS;
private redisError?: string;
private readonly flagSetsFilter: string[];

constructor(log: ILogger, keys: KeyBuilderSS, redis: Redis, splitFiltersValidation?: ISplitFiltersValidation) {
constructor(log: ILogger, keys: KeyBuilderSS, redis: RedisAdapter, splitFiltersValidation?: ISplitFiltersValidation) {
super();
this.log = log;
this.redis = redis;
Expand Down Expand Up @@ -192,7 +192,7 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
*/
getAll(): Promise<ISplit[]> {
return this.redis.keys(this.keys.searchPatternForSplitKeys())
.then((listOfKeys) => this.redis.pipeline(listOfKeys.map(k => ['get', k])).exec())
.then((listOfKeys) => this.redis.pipelineExec(listOfKeys.map(k => ['get', k])))
.then(processPipelineAnswer)
.then((splitDefinitions) => splitDefinitions.map((splitDefinition) => {
return JSON.parse(splitDefinition);
Expand All @@ -216,7 +216,7 @@ export class SplitsCacheInRedis extends AbstractSplitsCacheAsync {
* or rejected if the pipelined redis operation fails.
*/
getNamesByFlagSets(flagSets: string[]): Promise<ISet<string>[]> {
return this.redis.pipeline(flagSets.map(flagSet => ['smembers', this.keys.buildFlagSetKey(flagSet)])).exec()
return this.redis.pipelineExec(flagSets.map(flagSet => ['smembers', this.keys.buildFlagSetKey(flagSet)]))
.then((results) => results.map(([e, value], index) => {
if (e === null) return value;

Expand Down
4 changes: 2 additions & 2 deletions src/storages/inRedis/TelemetryCacheInRedis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { Method, MultiConfigs, MultiMethodExceptions, MultiMethodLatencies } fro
import { KeyBuilderSS } from '../KeyBuilderSS';
import { ITelemetryCacheAsync } from '../types';
import { findLatencyIndex } from '../findLatencyIndex';
import { Redis } from 'ioredis';
import { getTelemetryConfigStats } from '../../sync/submitters/telemetrySubmitter';
import { CONSUMER_MODE, STORAGE_REDIS } from '../../utils/constants';
import { isNaNNumber, isString } from '../../utils/lang';
import { _Map } from '../../utils/lang/maps';
import { MAX_LATENCY_BUCKET_COUNT, newBuckets } from '../inMemory/TelemetryCacheInMemory';
import { parseLatencyField, parseExceptionField, parseMetadata } from '../utils';
import type { RedisAdapter } from './RedisAdapter';

export class TelemetryCacheInRedis implements ITelemetryCacheAsync {

Expand All @@ -19,7 +19,7 @@ export class TelemetryCacheInRedis implements ITelemetryCacheAsync {
* @param keys Key builder.
* @param redis Redis client.
*/
constructor(private readonly log: ILogger, private readonly keys: KeyBuilderSS, private readonly redis: Redis) { }
constructor(private readonly log: ILogger, private readonly keys: KeyBuilderSS, private readonly redis: RedisAdapter) { }

recordLatency(method: Method, latencyMs: number) {
const [key, field] = this.keys.buildLatencyKey(method, findLatencyIndex(latencyMs)).split('::');
Expand Down
6 changes: 3 additions & 3 deletions src/storages/inRedis/UniqueKeysCacheInRedis.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { IUniqueKeysCacheBase } from '../types';
import { Redis } from 'ioredis';
import { UniqueKeysCacheInMemory } from '../inMemory/UniqueKeysCacheInMemory';
import { setToArray } from '../../utils/lang/sets';
import { DEFAULT_CACHE_SIZE, REFRESH_RATE, TTL_REFRESH } from './constants';
import { LOG_PREFIX } from './constants';
import { ILogger } from '../../logger/types';
import { UniqueKeysItemSs } from '../../sync/submitters/types';
import type { RedisAdapter } from './RedisAdapter';

export class UniqueKeysCacheInRedis extends UniqueKeysCacheInMemory implements IUniqueKeysCacheBase {

private readonly log: ILogger;
private readonly key: string;
private readonly redis: Redis;
private readonly redis: RedisAdapter;
private readonly refreshRate: number;
private intervalId: any;

constructor(log: ILogger, key: string, redis: Redis, uniqueKeysQueueSize = DEFAULT_CACHE_SIZE, refreshRate = REFRESH_RATE) {
constructor(log: ILogger, key: string, redis: RedisAdapter, uniqueKeysQueueSize = DEFAULT_CACHE_SIZE, refreshRate = REFRESH_RATE) {
super(uniqueKeysQueueSize);
this.log = log;
this.key = key;
Expand Down
6 changes: 3 additions & 3 deletions src/storages/inRedis/__tests__/EventsCacheInRedis.spec.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import Redis from 'ioredis';
import { loggerMock } from '../../../logger/__tests__/sdkLogger.mock';
import { EventsCacheInRedis } from '../EventsCacheInRedis';
import { metadata, fakeEvent1, fakeEvent1stored, fakeEvent2, fakeEvent2stored, fakeEvent3, fakeEvent3stored } from '../../pluggable/__tests__/EventsCachePluggable.spec';
import { RedisAdapter } from '../RedisAdapter';

const prefix = 'events_cache_ut';
const eventsKey = `${prefix}.events`;
const nonListKey = 'non-list-key';

test('EVENTS CACHE IN REDIS / `track`, `count`, `popNWithMetadata` and `drop` methods', async () => {
const connection = new Redis();
const connection = new RedisAdapter(loggerMock);

// Clean up in case there are still keys there.
await connection.del(eventsKey);
Expand Down Expand Up @@ -54,5 +54,5 @@ test('EVENTS CACHE IN REDIS / `track`, `count`, `popNWithMetadata` and `drop` me

// Clean up then end.
await connection.del(eventsKey, nonListKey);
await connection.quit();
await connection.disconnect();
});
Loading

0 comments on commit d5202b1

Please sign in to comment.