From 4808770333061bf51d1861c2dee5cd3834d06556 Mon Sep 17 00:00:00 2001 From: Harminder Virk Date: Fri, 28 Jul 2023 21:20:15 +0530 Subject: [PATCH] refactor: use emittery and make events typed --- package.json | 1 - src/connections/abstract_connection.ts | 124 +++++++++++++------- src/connections/io_methods.ts | 1 - src/connections/redis_cluster_connection.ts | 11 +- src/connections/redis_connection.ts | 9 +- src/redis_manager.ts | 11 +- src/types/main.ts | 34 ++++++ tests/redis_cluster_connection.spec.ts | 6 +- tests/redis_connection.spec.ts | 13 +- tests/redis_manager.spec.ts | 34 +----- tests_helpers/main.ts | 32 +++++ 11 files changed, 184 insertions(+), 92 deletions(-) create mode 100644 tests_helpers/main.ts diff --git a/package.json b/package.json index da53a52..5931107 100644 --- a/package.json +++ b/package.json @@ -64,7 +64,6 @@ "github-label-sync": "^2.3.1", "husky": "^8.0.3", "np": "^8.0.4", - "p-event": "^6.0.0", "prettier": "^3.0.0", "ts-node": "^10.9.1", "typescript": "^5.1.6" diff --git a/src/connections/abstract_connection.ts b/src/connections/abstract_connection.ts index 5be3bc3..f3ab253 100644 --- a/src/connections/abstract_connection.ts +++ b/src/connections/abstract_connection.ts @@ -7,18 +7,27 @@ * file that was distributed with this source code. */ -import { EventEmitter } from 'node:events' +import Emittery from 'emittery' import type { Redis, Cluster } from 'ioredis' import { setTimeout } from 'node:timers/promises' import * as errors from '../errors.js' -import type { HealthReportNode, PubSubChannelHandler, PubSubPatternHandler } from '../types/main.js' +import type { + PubSubOptions, + HealthReportNode, + ConnectionEvents, + PubSubChannelHandler, + PubSubPatternHandler, +} from '../types/main.js' /** * Abstract factory implements the shared functionality required by Redis cluster * and the normal Redis connections. */ -export abstract class AbstractConnection extends EventEmitter { +export abstract class AbstractConnection< + T extends Redis | Cluster, + Events extends ConnectionEvents, +> extends Emittery { /** * Reference to the main ioRedis connection */ @@ -95,32 +104,38 @@ export abstract class AbstractConnection extends Even * things properly and also notify subscribers of this class */ protected monitorConnection() { - this.ioConnection.on('connect', () => this.emit('connect', this)) - this.ioConnection.on('wait', () => this.emit('wait', this)) + this.ioConnection.on('connect', () => this.emit('connect', { connection: this })) + this.ioConnection.on('wait', () => this.emit('wait', { connection: this })) this.ioConnection.on('ready', () => { /** * We must set the error to null when server is ready for accept * commands */ this.#lastError = null - this.emit('ready', this) + this.emit('ready', { connection: this }) }) this.ioConnection.on('error', (error: any) => { this.#lastError = error - this.emit('error', error, this) + this.emit('error', { error, connection: this }) }) - this.ioConnection.on('close', () => this.emit('close', this)) - this.ioConnection.on('reconnecting', () => this.emit('reconnecting', this)) + this.ioConnection.on('close', () => this.emit('close', { connection: this })) + this.ioConnection.on('reconnecting', (waitTime: number) => + this.emit('reconnecting', { connection: this, waitTime }) + ) /** * Cluster only events */ - this.ioConnection.on('+node', (node: Redis) => this.emit('node:added', this, node)) - this.ioConnection.on('-node', (node: Redis) => this.emit('node:removed', this, node)) + this.ioConnection.on('+node', (node: Redis) => + this.emit('node:added', { connection: this, node }) + ) + this.ioConnection.on('-node', (node: Redis) => + this.emit('node:removed', { connection: this, node }) + ) this.ioConnection.on('node error', (error: any, address: string) => { - this.emit('node:error', error, address, this) + this.emit('node:error', { error, address, connection: this }) }) /** @@ -128,17 +143,20 @@ export abstract class AbstractConnection extends Even */ this.ioConnection.on('end', async () => { this.ioConnection.removeAllListeners() - this.emit('end', this) - this.removeAllListeners('connect') - this.removeAllListeners('wait') - this.removeAllListeners('ready') - this.removeAllListeners('error') - this.removeAllListeners('close') - this.removeAllListeners('reconnecting') - this.removeAllListeners('node:added') - this.removeAllListeners('node:removed') - this.removeAllListeners('node:error') - this.removeAllListeners('end') + this.emit('end', { connection: this }).finally(() => { + this.clearListeners([ + 'connect', + 'wait', + 'ready', + 'error', + 'close', + 'reconnecting', + 'node:added', + 'node:error', + 'node:removed', + 'end', + ]) + }) }) } @@ -148,15 +166,20 @@ export abstract class AbstractConnection extends Even * this class. */ protected monitorSubscriberConnection() { - this.ioSubscriberConnection!.on('connect', () => this.emit('subscriber:connect', this)) - this.ioSubscriberConnection!.on('wait', () => this.emit('subscriber:wait', this)) - this.ioSubscriberConnection!.on('ready', () => this.emit('subscriber:ready', this)) + this.ioSubscriberConnection!.on('connect', () => + this.emit('subscriber:connect', { connection: this }) + ) + this.ioSubscriberConnection!.on('ready', () => + this.emit('subscriber:ready', { connection: this }) + ) this.ioSubscriberConnection!.on('error', (error: any) => { - this.emit('subscriber:error', error, this) + this.emit('subscriber:error', { error, connection: this }) }) - this.ioSubscriberConnection!.on('close', () => this.emit('subscriber:close', this)) - this.ioSubscriberConnection!.on('reconnecting', () => - this.emit('subscriber:reconnecting', this) + this.ioSubscriberConnection!.on('close', () => + this.emit('subscriber:close', { connection: this }) + ) + this.ioSubscriberConnection!.on('reconnecting', (waitTime: number) => + this.emit('subscriber:reconnecting', { connection: this, waitTime }) ) /** @@ -165,7 +188,7 @@ export abstract class AbstractConnection extends Even */ this.ioSubscriberConnection!.on('end', async () => { this.ioSubscriberConnection!.removeAllListeners() - this.emit('subscriber:end', this) + this.emit('subscriber:end', { connection: this }) /** * Cleanup subscriptions @@ -174,13 +197,14 @@ export abstract class AbstractConnection extends Even this.psubscriptions.clear() this.ioSubscriberConnection = undefined - this.removeAllListeners('subscriber:connect') - this.removeAllListeners('subscriber:wait') - this.removeAllListeners('subscriber:ready') - this.removeAllListeners('subscriber:error') - this.removeAllListeners('subscriber:close') - this.removeAllListeners('subscriber:reconnecting') - this.removeAllListeners('subscriber:end') + this.clearListeners([ + 'subscriber:connect', + 'subscriber:ready', + 'subscriber:error', + 'subscriber:close', + 'subscriber:reconnecting', + 'subscriber:end', + ]) }) } @@ -243,7 +267,7 @@ export abstract class AbstractConnection extends Even * Subscribe to a given channel to receive Redis pub/sub events. A * new subscriber connection will be created/managed automatically. */ - subscribe(channel: string, handler: PubSubChannelHandler): void { + subscribe(channel: string, handler: PubSubChannelHandler, options?: PubSubOptions): void { /** * Make the subscriber connection. The method results in a noop when * subscriber connection already exists. @@ -264,11 +288,17 @@ export abstract class AbstractConnection extends Even */ this.ioSubscriberConnection!.subscribe(channel) .then((count) => { - this.emit('subscription:ready', count, this) + if (options?.onSubscription) { + options?.onSubscription(count as number) + } + this.emit('subscription:ready', { count: count as number, connection: this }) this.subscriptions.set(channel, handler) }) .catch((error) => { - this.emit('subscription:error', error, this) + if (options?.onError) { + options?.onError(error) + } + this.emit('subscription:error', { error, connection: this }) }) } @@ -283,7 +313,7 @@ export abstract class AbstractConnection extends Even /** * Make redis subscription for a pattern */ - psubscribe(pattern: string, handler: PubSubPatternHandler): void { + psubscribe(pattern: string, handler: PubSubPatternHandler, options?: PubSubOptions): void { /** * Make the subscriber connection. The method results in a noop when * subscriber connection already exists. @@ -304,11 +334,17 @@ export abstract class AbstractConnection extends Even */ this.ioSubscriberConnection!.psubscribe(pattern) .then((count) => { - this.emit('psubscription:ready', count, this) + if (options?.onSubscription) { + options?.onSubscription(count as number) + } + this.emit('psubscription:ready', { count: count as number, connection: this }) this.psubscriptions.set(pattern, handler) }) .catch((error) => { - this.emit('psubscription:error', error, this) + if (options?.onError) { + options?.onError(error) + } + this.emit('psubscription:error', { error, connection: this }) }) } diff --git a/src/connections/io_methods.ts b/src/connections/io_methods.ts index e01c51c..793bea7 100644 --- a/src/connections/io_methods.ts +++ b/src/connections/io_methods.ts @@ -372,7 +372,6 @@ export const redisMethods = [ 'copy', 'createBuiltinCommand', 'dbsize', - 'debug', 'decr', 'decrby', 'del', diff --git a/src/connections/redis_cluster_connection.ts b/src/connections/redis_cluster_connection.ts index 2940ba1..1673acd 100644 --- a/src/connections/redis_cluster_connection.ts +++ b/src/connections/redis_cluster_connection.ts @@ -12,14 +12,21 @@ import Redis, { type Cluster, type NodeRole } from 'ioredis' import debug from '../debug.js' import { baseMethods } from './io_methods.js' import { AbstractConnection } from './abstract_connection.js' -import type { IORedisBaseCommands, RedisClusterConnectionConfig } from '../types/main.js' +import type { + ConnectionEvents, + IORedisBaseCommands, + RedisClusterConnectionConfig, +} from '../types/main.js' /** * Redis cluster connection exposes the API to run Redis commands using `ioredis` as the * underlying client. The class abstracts the need of creating and managing multiple * pub/sub connections by hand, since it handles that internally by itself. */ -export class RedisClusterConnection extends AbstractConnection { +export class RedisClusterConnection extends AbstractConnection< + Cluster, + ConnectionEvents +> { #hosts: RedisClusterConnectionConfig['clusters'] #config: RedisClusterConnectionConfig['clusterOptions'] diff --git a/src/connections/redis_connection.ts b/src/connections/redis_connection.ts index 8c3337f..e64922f 100644 --- a/src/connections/redis_connection.ts +++ b/src/connections/redis_connection.ts @@ -12,7 +12,11 @@ import { Redis, RedisOptions } from 'ioredis' import debug from '../debug.js' import { redisMethods } from './io_methods.js' import { AbstractConnection } from './abstract_connection.js' -import { IORedisConnectionCommands, RedisConnectionConfig } from '../types/main.js' +import type { + ConnectionEvents, + RedisConnectionConfig, + IORedisConnectionCommands, +} from '../types/main.js' /** * Redis connection exposes the API to run Redis commands using `ioredis` as the @@ -20,7 +24,7 @@ import { IORedisConnectionCommands, RedisConnectionConfig } from '../types/main. * multiple pub/sub connections by hand, since it handles that internally * by itself. */ -export class RedisConnection extends AbstractConnection { +export class RedisConnection extends AbstractConnection> { #config: RedisOptions /** @@ -73,6 +77,7 @@ export class RedisConnection extends AbstractConnection { * class and also extending its TypeScript types */ export interface RedisConnection extends IORedisConnectionCommands {} + redisMethods.forEach((method) => { ;(RedisConnection.prototype as any)[method] = function redisConnectionProxyFn(...args: any[]) { return this.ioConnection[method](...args) diff --git a/src/redis_manager.ts b/src/redis_manager.ts index 1569a58..a6bcf5c 100644 --- a/src/redis_manager.ts +++ b/src/redis_manager.ts @@ -55,8 +55,11 @@ class RedisManager extends Emitter /** * The default error reporter we use to log redis errors */ - #errorReporter = function logRedisError(this: RedisManager, error: any) { - this.#logger.fatal({ err: error }, 'Redis connection failure') + #errorReporter = function logRedisError( + this: RedisManager, + data: { error: any } + ) { + this.#logger.fatal({ err: data.error }, 'Redis connection failure') }.bind(this) /** @@ -112,7 +115,7 @@ class RedisManager extends Emitter this.#shouldLogRedisErrors = false Object.keys(this.activeConnections).forEach((name) => { debug('removing error reporter from %s connection', name) - this.activeConnections[name]?.removeListener('error', this.#errorReporter) + this.activeConnections[name]?.off('error', this.#errorReporter) }) return this } @@ -172,7 +175,7 @@ class RedisManager extends Emitter /** * Remove connection from the list of tracked connections */ - connection.on('end', ($connection) => { + connection.on('end', ({ connection: $connection }) => { debug('%s connection closed. Removing from tracked connections list', name) delete this.activeConnections[$connection.connectionName] }) diff --git a/src/types/main.ts b/src/types/main.ts index 17b5bd5..813b2ad 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -23,6 +23,40 @@ export type PubSubPatternHandler = ( data: T ) => Promise | void +/** + * Options accepted during subscribe + */ +export type PubSubOptions = { + onError(error: any): void + onSubscription(count: number): void +} + +/** + * List of connection events + */ +export type ConnectionEvents = { + 'connect': { connection: T } + 'wait': { connection: T } + 'ready': { connection: T } + 'error': { error: any; connection: T } + 'close': { connection: T } + 'reconnecting': { connection: T; waitTime: number } + 'end': { connection: T } + 'subscriber:connect': { connection: T } + 'subscriber:ready': { connection: T } + 'subscriber:error': { error: any; connection: T } + 'subscriber:close': { connection: T } + 'subscriber:reconnecting': { connection: T; waitTime: number } + 'subscriber:end': { connection: T } + 'node:added': { connection: T; node: Redis } + 'node:removed': { connection: T; node: Redis } + 'node:error': { error: any; address: string; connection: T } + 'subscription:ready': { connection: T; count: number } + 'subscription:error': { connection: T; error: any } + 'psubscription:ready': { connection: T; count: number } + 'psubscription:error': { connection: T; error: any } +} + /** * Shape of the report node for the redis connection report */ diff --git a/tests/redis_cluster_connection.spec.ts b/tests/redis_cluster_connection.spec.ts index 057a745..95e2e3c 100644 --- a/tests/redis_cluster_connection.spec.ts +++ b/tests/redis_cluster_connection.spec.ts @@ -7,8 +7,8 @@ * file that was distributed with this source code. */ -import { pEvent } from 'p-event' import { test } from '@japa/runner' +import { pEvent } from '../tests_helpers/main.js' import RedisClusterConnection from '../src/connections/redis_cluster_connection.js' const nodes = process.env.REDIS_CLUSTER_PORTS!.split(',').map((port) => { @@ -89,8 +89,8 @@ test.group('Redis cluster factory', () => { cleanup(() => connection.quit()) connection.on('error', () => {}) - const [error] = await pEvent(connection, 'node:error', { multiArgs: true }) - assert.equal(error.message, 'Connection is closed.') + const response = await pEvent(connection, 'node:error') + assert.equal(response!.error.message, 'Connection is closed.') }) test('access cluster nodes', async ({ assert, cleanup }) => { diff --git a/tests/redis_connection.spec.ts b/tests/redis_connection.spec.ts index 8cb77ad..e5d3c64 100644 --- a/tests/redis_connection.spec.ts +++ b/tests/redis_connection.spec.ts @@ -7,8 +7,9 @@ * file that was distributed with this source code. */ -import { pEvent } from 'p-event' import { test } from '@japa/runner' + +import { pEvent } from '../tests_helpers/main.js' import RedisConnection from '../src/connections/redis_connection.js' test.group('Redis connection', () => { @@ -39,8 +40,8 @@ test.group('Redis connection', () => { const connection = new RedisConnection('main', { port: 4444 }) cleanup(() => connection.disconnect()) - const [error] = await pEvent(connection, 'error', { multiArgs: true }) - assert.equal(error.message, 'connect ECONNREFUSED 127.0.0.1:4444') + const response = await pEvent(connection, 'error') + assert.equal(response!.error.message, 'connect ECONNREFUSED 127.0.0.1:4444') }) test('cleanup listeners on quit', async ({ assert }) => { @@ -279,12 +280,12 @@ test.group('Redis connection', () => { test('emit error when unable to make subscriber connection', async ({ assert, cleanup }) => { const connection = new RedisConnection('main', { port: 4444 }) - await pEvent(connection, 'error', { multiArgs: true }) + await pEvent(connection, 'error') cleanup(() => connection.disconnect()) connection.subscribe('foo', () => {}) - const [error] = await pEvent(connection, 'subscriber:error', { multiArgs: true }) - assert.equal(error.message, 'connect ECONNREFUSED 127.0.0.1:4444') + const response = await pEvent(connection, 'subscriber:error') + assert.equal(response!.error.message, 'connect ECONNREFUSED 127.0.0.1:4444') }) test('cleanup subscribers listeners on quit', async ({ assert }) => { diff --git a/tests/redis_manager.spec.ts b/tests/redis_manager.spec.ts index c079380..b27a8e4 100644 --- a/tests/redis_manager.spec.ts +++ b/tests/redis_manager.spec.ts @@ -7,10 +7,9 @@ * file that was distributed with this source code. */ -import { pEvent } from 'p-event' import { test } from '@japa/runner' -import type { Connection } from '../src/types/main.js' +import { pEvent } from '../tests_helpers/main.js' import { RedisManagerFactory } from '../factories/redis_manager.js' import RedisConnection from '../src/connections/redis_connection.js' import RedisClusterConnection from '../src/connections/redis_cluster_connection.js' @@ -254,11 +253,7 @@ test.group('Redis Manager', () => { }, }).create() - const [connection] = await Promise.all([ - pEvent<'connection', Connection>(redis, 'connection'), - redis.connection(), - ]) - + const [connection] = await Promise.all([pEvent(redis, 'connection'), redis.connection()]) assert.strictEqual(connection, redis.connection()) }) @@ -277,13 +272,7 @@ test.group('Redis Manager', () => { }) const redis = manager.create() - - /** - * pEvent throws an exception when the error event is emitted. We are - * supressing that, because our error reporter should handle - * it - */ - await pEvent(redis.connection(), 'end', { rejectionEvents: [] }) + await pEvent(redis.connection(), 'end') const errorLog = JSON.parse(manager.logs[0]) assert.equal(errorLog.level, 60) @@ -311,12 +300,7 @@ test.group('Redis Manager', () => { connection.on('error', () => {}) }) - /** - * pEvent throws an exception when the error event is emitted. We are - * supressing that, because our error reporter should handle - * it - */ - await pEvent(redis.connection(), 'end', { rejectionEvents: [] }) + await pEvent(redis.connection(), 'end') assert.lengthOf(manager.logs, 0) }) @@ -339,15 +323,7 @@ test.group('Redis Manager', () => { connection.on('error', () => {}) }) - /** - * pEvent throws an exception when the error event is emitted. We are - * supressing that, because our error reporter should handle - * it - */ - await Promise.all([ - pEvent(redis.connection(), 'end', { rejectionEvents: [] }), - redis.doNotLogErrors(), - ]) + await Promise.all([pEvent(redis.connection(), 'end'), redis.doNotLogErrors()]) assert.lengthOf(manager.logs, 0) }) diff --git a/tests_helpers/main.ts b/tests_helpers/main.ts new file mode 100644 index 0000000..150c7a8 --- /dev/null +++ b/tests_helpers/main.ts @@ -0,0 +1,32 @@ +/* + * @adonisjs/redis + * + * (c) AdonisJS + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +import Emittery from 'emittery' + +/** + * Promisify an event + */ +export function pEvent( + emitter: Emittery, + event: K, + timeout: number = 500 +) { + return new Promise((resolve) => { + function handler(data: T[K]) { + emitter.off(event, handler) + resolve(data) + } + + setTimeout(() => { + emitter.off(event, handler) + resolve(null) + }, timeout) + emitter.on(event, handler) + }) +}