Skip to content

Commit

Permalink
refactor: use emittery and make events typed
Browse files Browse the repository at this point in the history
  • Loading branch information
thetutlage committed Jul 28, 2023
1 parent 0845d18 commit 4808770
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 92 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
"github-label-sync": "^2.3.1",
"husky": "^8.0.3",
"np": "^8.0.4",
"p-event": "^6.0.0",
"prettier": "^3.0.0",
"ts-node": "^10.9.1",
"typescript": "^5.1.6"
Expand Down
124 changes: 80 additions & 44 deletions src/connections/abstract_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,27 @@
* file that was distributed with this source code.
*/

import { EventEmitter } from 'node:events'
import Emittery from 'emittery'
import type { Redis, Cluster } from 'ioredis'
import { setTimeout } from 'node:timers/promises'

import * as errors from '../errors.js'
import type { HealthReportNode, PubSubChannelHandler, PubSubPatternHandler } from '../types/main.js'
import type {
PubSubOptions,
HealthReportNode,
ConnectionEvents,
PubSubChannelHandler,
PubSubPatternHandler,
} from '../types/main.js'

/**
* Abstract factory implements the shared functionality required by Redis cluster
* and the normal Redis connections.
*/
export abstract class AbstractConnection<T extends Redis | Cluster> extends EventEmitter {
export abstract class AbstractConnection<
T extends Redis | Cluster,
Events extends ConnectionEvents<any>,
> extends Emittery<Events> {
/**
* Reference to the main ioRedis connection
*/
Expand Down Expand Up @@ -95,50 +104,59 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
* things properly and also notify subscribers of this class
*/
protected monitorConnection() {
this.ioConnection.on('connect', () => this.emit('connect', this))
this.ioConnection.on('wait', () => this.emit('wait', this))
this.ioConnection.on('connect', () => this.emit('connect', { connection: this }))
this.ioConnection.on('wait', () => this.emit('wait', { connection: this }))
this.ioConnection.on('ready', () => {
/**
* We must set the error to null when server is ready for accept
* commands
*/
this.#lastError = null
this.emit('ready', this)
this.emit('ready', { connection: this })
})

this.ioConnection.on('error', (error: any) => {
this.#lastError = error
this.emit('error', error, this)
this.emit('error', { error, connection: this })
})

this.ioConnection.on('close', () => this.emit('close', this))
this.ioConnection.on('reconnecting', () => this.emit('reconnecting', this))
this.ioConnection.on('close', () => this.emit('close', { connection: this }))
this.ioConnection.on('reconnecting', (waitTime: number) =>
this.emit('reconnecting', { connection: this, waitTime })
)

/**
* Cluster only events
*/
this.ioConnection.on('+node', (node: Redis) => this.emit('node:added', this, node))
this.ioConnection.on('-node', (node: Redis) => this.emit('node:removed', this, node))
this.ioConnection.on('+node', (node: Redis) =>
this.emit('node:added', { connection: this, node })
)
this.ioConnection.on('-node', (node: Redis) =>
this.emit('node:removed', { connection: this, node })
)
this.ioConnection.on('node error', (error: any, address: string) => {
this.emit('node:error', error, address, this)
this.emit('node:error', { error, address, connection: this })
})

/**
* On end, we must cleanup client and self listeners
*/
this.ioConnection.on('end', async () => {
this.ioConnection.removeAllListeners()
this.emit('end', this)
this.removeAllListeners('connect')
this.removeAllListeners('wait')
this.removeAllListeners('ready')
this.removeAllListeners('error')
this.removeAllListeners('close')
this.removeAllListeners('reconnecting')
this.removeAllListeners('node:added')
this.removeAllListeners('node:removed')
this.removeAllListeners('node:error')
this.removeAllListeners('end')
this.emit('end', { connection: this }).finally(() => {
this.clearListeners([
'connect',
'wait',
'ready',
'error',
'close',
'reconnecting',
'node:added',
'node:error',
'node:removed',
'end',
])
})
})
}

Expand All @@ -148,15 +166,20 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
* this class.
*/
protected monitorSubscriberConnection() {
this.ioSubscriberConnection!.on('connect', () => this.emit('subscriber:connect', this))
this.ioSubscriberConnection!.on('wait', () => this.emit('subscriber:wait', this))
this.ioSubscriberConnection!.on('ready', () => this.emit('subscriber:ready', this))
this.ioSubscriberConnection!.on('connect', () =>
this.emit('subscriber:connect', { connection: this })
)
this.ioSubscriberConnection!.on('ready', () =>
this.emit('subscriber:ready', { connection: this })
)
this.ioSubscriberConnection!.on('error', (error: any) => {
this.emit('subscriber:error', error, this)
this.emit('subscriber:error', { error, connection: this })
})
this.ioSubscriberConnection!.on('close', () => this.emit('subscriber:close', this))
this.ioSubscriberConnection!.on('reconnecting', () =>
this.emit('subscriber:reconnecting', this)
this.ioSubscriberConnection!.on('close', () =>
this.emit('subscriber:close', { connection: this })
)
this.ioSubscriberConnection!.on('reconnecting', (waitTime: number) =>
this.emit('subscriber:reconnecting', { connection: this, waitTime })
)

/**
Expand All @@ -165,7 +188,7 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
*/
this.ioSubscriberConnection!.on('end', async () => {
this.ioSubscriberConnection!.removeAllListeners()
this.emit('subscriber:end', this)
this.emit('subscriber:end', { connection: this })

/**
* Cleanup subscriptions
Expand All @@ -174,13 +197,14 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
this.psubscriptions.clear()

this.ioSubscriberConnection = undefined
this.removeAllListeners('subscriber:connect')
this.removeAllListeners('subscriber:wait')
this.removeAllListeners('subscriber:ready')
this.removeAllListeners('subscriber:error')
this.removeAllListeners('subscriber:close')
this.removeAllListeners('subscriber:reconnecting')
this.removeAllListeners('subscriber:end')
this.clearListeners([
'subscriber:connect',
'subscriber:ready',
'subscriber:error',
'subscriber:close',
'subscriber:reconnecting',
'subscriber:end',
])
})
}

Expand Down Expand Up @@ -243,7 +267,7 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
* Subscribe to a given channel to receive Redis pub/sub events. A
* new subscriber connection will be created/managed automatically.
*/
subscribe(channel: string, handler: PubSubChannelHandler): void {
subscribe(channel: string, handler: PubSubChannelHandler, options?: PubSubOptions): void {
/**
* Make the subscriber connection. The method results in a noop when
* subscriber connection already exists.
Expand All @@ -264,11 +288,17 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
*/
this.ioSubscriberConnection!.subscribe(channel)
.then((count) => {
this.emit('subscription:ready', count, this)
if (options?.onSubscription) {
options?.onSubscription(count as number)
}
this.emit('subscription:ready', { count: count as number, connection: this })
this.subscriptions.set(channel, handler)
})
.catch((error) => {
this.emit('subscription:error', error, this)
if (options?.onError) {
options?.onError(error)
}
this.emit('subscription:error', { error, connection: this })
})
}

Expand All @@ -283,7 +313,7 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
/**
* Make redis subscription for a pattern
*/
psubscribe(pattern: string, handler: PubSubPatternHandler): void {
psubscribe(pattern: string, handler: PubSubPatternHandler, options?: PubSubOptions): void {
/**
* Make the subscriber connection. The method results in a noop when
* subscriber connection already exists.
Expand All @@ -304,11 +334,17 @@ export abstract class AbstractConnection<T extends Redis | Cluster> extends Even
*/
this.ioSubscriberConnection!.psubscribe(pattern)
.then((count) => {
this.emit('psubscription:ready', count, this)
if (options?.onSubscription) {
options?.onSubscription(count as number)
}
this.emit('psubscription:ready', { count: count as number, connection: this })
this.psubscriptions.set(pattern, handler)
})
.catch((error) => {
this.emit('psubscription:error', error, this)
if (options?.onError) {
options?.onError(error)
}
this.emit('psubscription:error', { error, connection: this })
})
}

Expand Down
1 change: 0 additions & 1 deletion src/connections/io_methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ export const redisMethods = [
'copy',
'createBuiltinCommand',
'dbsize',
'debug',
'decr',
'decrby',
'del',
Expand Down
11 changes: 9 additions & 2 deletions src/connections/redis_cluster_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@ import Redis, { type Cluster, type NodeRole } from 'ioredis'
import debug from '../debug.js'
import { baseMethods } from './io_methods.js'
import { AbstractConnection } from './abstract_connection.js'
import type { IORedisBaseCommands, RedisClusterConnectionConfig } from '../types/main.js'
import type {
ConnectionEvents,
IORedisBaseCommands,
RedisClusterConnectionConfig,
} from '../types/main.js'

/**
* Redis cluster connection exposes the API to run Redis commands using `ioredis` as the
* underlying client. The class abstracts the need of creating and managing multiple
* pub/sub connections by hand, since it handles that internally by itself.
*/
export class RedisClusterConnection extends AbstractConnection<Cluster> {
export class RedisClusterConnection extends AbstractConnection<
Cluster,
ConnectionEvents<RedisClusterConnection>
> {
#hosts: RedisClusterConnectionConfig['clusters']
#config: RedisClusterConnectionConfig['clusterOptions']

Expand Down
9 changes: 7 additions & 2 deletions src/connections/redis_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@ import { Redis, RedisOptions } from 'ioredis'
import debug from '../debug.js'
import { redisMethods } from './io_methods.js'
import { AbstractConnection } from './abstract_connection.js'
import { IORedisConnectionCommands, RedisConnectionConfig } from '../types/main.js'
import type {
ConnectionEvents,
RedisConnectionConfig,
IORedisConnectionCommands,
} from '../types/main.js'

/**
* Redis connection exposes the API to run Redis commands using `ioredis` as the
* underlying client. The class abstracts the need of creating and managing
* multiple pub/sub connections by hand, since it handles that internally
* by itself.
*/
export class RedisConnection extends AbstractConnection<Redis> {
export class RedisConnection extends AbstractConnection<Redis, ConnectionEvents<RedisConnection>> {
#config: RedisOptions

/**
Expand Down Expand Up @@ -73,6 +77,7 @@ export class RedisConnection extends AbstractConnection<Redis> {
* class and also extending its TypeScript types
*/
export interface RedisConnection extends IORedisConnectionCommands {}

redisMethods.forEach((method) => {
;(RedisConnection.prototype as any)[method] = function redisConnectionProxyFn(...args: any[]) {
return this.ioConnection[method](...args)
Expand Down
11 changes: 7 additions & 4 deletions src/redis_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ class RedisManager<ConnectionsList extends RedisConnectionsList> extends Emitter
/**
* The default error reporter we use to log redis errors
*/
#errorReporter = function logRedisError(this: RedisManager<ConnectionsList>, error: any) {
this.#logger.fatal({ err: error }, 'Redis connection failure')
#errorReporter = function logRedisError(
this: RedisManager<ConnectionsList>,
data: { error: any }
) {
this.#logger.fatal({ err: data.error }, 'Redis connection failure')
}.bind(this)

/**
Expand Down Expand Up @@ -112,7 +115,7 @@ class RedisManager<ConnectionsList extends RedisConnectionsList> extends Emitter
this.#shouldLogRedisErrors = false
Object.keys(this.activeConnections).forEach((name) => {
debug('removing error reporter from %s connection', name)
this.activeConnections[name]?.removeListener('error', this.#errorReporter)
this.activeConnections[name]?.off('error', this.#errorReporter)
})
return this
}
Expand Down Expand Up @@ -172,7 +175,7 @@ class RedisManager<ConnectionsList extends RedisConnectionsList> extends Emitter
/**
* Remove connection from the list of tracked connections
*/
connection.on('end', ($connection) => {
connection.on('end', ({ connection: $connection }) => {
debug('%s connection closed. Removing from tracked connections list', name)
delete this.activeConnections[$connection.connectionName]
})
Expand Down
34 changes: 34 additions & 0 deletions src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,40 @@ export type PubSubPatternHandler<T extends any = string> = (
data: T
) => Promise<void> | void

/**
* Options accepted during subscribe
*/
export type PubSubOptions = {
onError(error: any): void
onSubscription(count: number): void
}

/**
* List of connection events
*/
export type ConnectionEvents<T extends any> = {
'connect': { connection: T }
'wait': { connection: T }
'ready': { connection: T }
'error': { error: any; connection: T }
'close': { connection: T }
'reconnecting': { connection: T; waitTime: number }
'end': { connection: T }
'subscriber:connect': { connection: T }
'subscriber:ready': { connection: T }
'subscriber:error': { error: any; connection: T }
'subscriber:close': { connection: T }
'subscriber:reconnecting': { connection: T; waitTime: number }
'subscriber:end': { connection: T }
'node:added': { connection: T; node: Redis }
'node:removed': { connection: T; node: Redis }
'node:error': { error: any; address: string; connection: T }
'subscription:ready': { connection: T; count: number }
'subscription:error': { connection: T; error: any }
'psubscription:ready': { connection: T; count: number }
'psubscription:error': { connection: T; error: any }
}

/**
* Shape of the report node for the redis connection report
*/
Expand Down
6 changes: 3 additions & 3 deletions tests/redis_cluster_connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
* file that was distributed with this source code.
*/

import { pEvent } from 'p-event'
import { test } from '@japa/runner'
import { pEvent } from '../tests_helpers/main.js'
import RedisClusterConnection from '../src/connections/redis_cluster_connection.js'

const nodes = process.env.REDIS_CLUSTER_PORTS!.split(',').map((port) => {
Expand Down Expand Up @@ -89,8 +89,8 @@ test.group('Redis cluster factory', () => {
cleanup(() => connection.quit())

connection.on('error', () => {})
const [error] = await pEvent(connection, 'node:error', { multiArgs: true })
assert.equal(error.message, 'Connection is closed.')
const response = await pEvent(connection, 'node:error')
assert.equal(response!.error.message, 'Connection is closed.')
})

test('access cluster nodes', async ({ assert, cleanup }) => {
Expand Down
Loading

0 comments on commit 4808770

Please sign in to comment.