Skip to content

Commit

Permalink
Wrap pipeline Redis operations
Browse files Browse the repository at this point in the history
  • Loading branch information
EmilianoSanchez committed Nov 29, 2023
1 parent 9f2fdc0 commit 8bf298b
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/storages/inRedis/RedisAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { timeout } from '../../utils/promise/timeout';
const LOG_PREFIX = 'storage:redis-adapter: ';

// If we ever decide to fully wrap every method, there's a Commander.getBuiltinCommands from ioredis.
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'pipeline', 'expire', 'mget', 'lrange', 'ltrim', 'hset'];
const METHODS_TO_PROMISE_WRAP = ['set', 'exec', 'del', 'get', 'keys', 'sadd', 'srem', 'sismember', 'smembers', 'incr', 'rpush', 'expire', 'mget', 'lrange', 'ltrim', 'hset', 'pipelineExec'];

// Not part of the settings since it'll vary on each storage. We should be removing storage specific logic from elsewhere.
const DEFAULT_OPTIONS = {
Expand Down Expand Up @@ -52,10 +52,16 @@ export class RedisAdapter extends ioredis {
this._setDisconnectWrapper();
}

pipelineExec(commands?: (string | number)[][]): Promise<Array<[Error | null, any]>> {
// @ts-ignore
return this.pipeline(commands).exec();
}

_listenToEvents() {
this.once('ready', () => {
const commandsCount = this._notReadyCommandsQueue ? this._notReadyCommandsQueue.length : 0;
this.log.info(LOG_PREFIX + `Redis connection established. Queued commands: ${commandsCount}.`);

this._notReadyCommandsQueue && this._notReadyCommandsQueue.forEach(queued => {
this.log.info(LOG_PREFIX + `Executing queued ${queued.name} command.`);
queued.command().then(queued.resolve).catch(queued.reject);
Expand All @@ -71,29 +77,28 @@ export class RedisAdapter extends ioredis {
_setTimeoutWrappers() {
const instance: Record<string, any> = this;

METHODS_TO_PROMISE_WRAP.forEach(method => {
const originalMethod = instance[method];
METHODS_TO_PROMISE_WRAP.forEach(methodName => {
const originalMethod = instance[methodName];

instance[method] = function () {
instance[methodName] = function () {
const params = arguments;

function commandWrapper() {
instance.log.debug(LOG_PREFIX + `Executing ${method}.`);
// Return original method
instance.log.debug(`${LOG_PREFIX}Executing ${methodName}.`);
const result = originalMethod.apply(instance, params);

if (thenable(result)) {
// For handling pending commands on disconnect, add to the set and remove once finished.
// On sync commands there's no need, only thenables.
instance._runningCommands.add(result);
const cleanUpRunningCommandsCb = function () {
const cleanUpRunningCommandsCb = () => {
instance._runningCommands.delete(result);
};
// Both success and error remove from queue.
result.then(cleanUpRunningCommandsCb, cleanUpRunningCommandsCb);

return timeout(instance._options.operationTimeout, result).catch(err => {
instance.log.error(LOG_PREFIX + `${method} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`);
instance.log.error(`${LOG_PREFIX}${methodName} operation threw an error or exceeded configured timeout of ${instance._options.operationTimeout}ms. Message: ${err}`);
// Handling is not the adapter responsibility.
throw err;
});
Expand All @@ -108,7 +113,7 @@ export class RedisAdapter extends ioredis {
resolve: res,
reject: rej,
command: commandWrapper,
name: method.toUpperCase()
name: methodName.toUpperCase()
});
});
} else {
Expand All @@ -124,7 +129,7 @@ export class RedisAdapter extends ioredis {

instance.disconnect = function disconnect(...params: []) {

setTimeout(function deferedDisconnect() {
setTimeout(function deferredDisconnect() {
if (instance._runningCommands.size > 0) {
instance.log.info(LOG_PREFIX + `Attempting to disconnect but there are ${instance._runningCommands.size} commands still waiting for resolution. Defering disconnection until those finish.`);

Expand Down

0 comments on commit 8bf298b

Please sign in to comment.