diff --git a/src/storages/inRedis/RedisAdapter.ts b/src/storages/inRedis/RedisAdapter.ts index 7beb2988..6f965ea7 100644 --- a/src/storages/inRedis/RedisAdapter.ts +++ b/src/storages/inRedis/RedisAdapter.ts @@ -8,7 +8,7 @@ import { timeout } from '../../utils/promise/timeout'; const LOG_PREFIX = 'storage:redis-adapter: '; // If we ever decide to fully wrap every method, there's a Commander.getBuiltinCommands from ioredis. -const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'pipeline', 'expire', 'mget', 'lrange', 'ltrim', 'hset']; +const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset', 'pipelineExec']; // Not part of the settings since it'll vary on each storage. We should be removing storage specific logic from elsewhere. const DEFAULT_OPTIONS = { @@ -52,10 +52,16 @@ export class RedisAdapter extends ioredis { this._setDisconnectWrapper(); } + pipelineExec(commands?: (string | number)[][]): Promise> { + // @ts-ignore + return this.pipeline(commands).exec(); + } + _listenToEvents() { this.once('ready', () => { const commandsCount = this._notReadyCommandsQueue ? this._notReadyCommandsQueue.length : 0; this.log.info(LOG_PREFIX + `Redis connection established. Queued commands: ${commandsCount}.`); + this._notReadyCommandsQueue && this._notReadyCommandsQueue.forEach(queued => { this.log.info(LOG_PREFIX + `Executing queued ${queued.name} command.`); queued.command().then(queued.resolve).catch(queued.reject); @@ -71,29 +77,28 @@ export class RedisAdapter extends ioredis { _setTimeoutWrappers() { const instance: Record = this; - METHODS_TO_PROMISE_WRAP.forEach(method => { - const originalMethod = instance[method]; + METHODS_TO_PROMISE_WRAP.forEach(methodName => { + const originalMethod = instance[methodName]; - instance[method] = function () { + instance[methodName] = function () { const params = arguments; function commandWrapper() { - instance.log.debug(LOG_PREFIX + `Executing ${method}.`); - // Return original method + instance.log.debug(`${LOG_PREFIX}Executing ${methodName}.`); const result = originalMethod.apply(instance, params); if (thenable(result)) { // For handling pending commands on disconnect, add to the set and remove once finished. // On sync commands there's no need, only thenables. instance._runningCommands.add(result); - const cleanUpRunningCommandsCb = function () { + const cleanUpRunningCommandsCb = () => { instance._runningCommands.delete(result); }; // Both success and error remove from queue. result.then(cleanUpRunningCommandsCb, cleanUpRunningCommandsCb); return timeout(instance._options.operationTimeout, result).catch(err => { - instance.log.error(LOG_PREFIX + `${method} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`); + instance.log.error(`${LOG_PREFIX}${methodName} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`); // Handling is not the adapter responsibility. throw err; }); @@ -108,7 +113,7 @@ export class RedisAdapter extends ioredis { resolve: res, reject: rej, command: commandWrapper, - name: method.toUpperCase() + name: methodName.toUpperCase() }); }); } else { @@ -124,7 +129,7 @@ export class RedisAdapter extends ioredis { instance.disconnect = function disconnect(...params: []) { - setTimeout(function deferedDisconnect() { + setTimeout(function deferredDisconnect() { if (instance._runningCommands.size > 0) { instance.log.info(LOG_PREFIX + `Attempting to disconnect but there are ${instance._runningCommands.size} commands still waiting for resolution. Defering disconnection until those finish.`);