diff --git a/connection.ts b/connection.ts index 16a354c5..5b2d0f5f 100644 --- a/connection.ts +++ b/connection.ts @@ -12,6 +12,74 @@ import type { Command, Protocol } from "./protocol/shared/protocol.ts"; import type { RedisReply, RedisValue } from "./protocol/shared/types.ts"; import { delay } from "./deps/std/async.ts"; +type TypedEventTarget = { + new (): IntermediateEventTarget; +}; + +interface IntermediateEventTarget extends EventTarget { + addEventListener( + type: K, + callback: ( + event: EventMap[K] extends Event ? EventMap[K] : never, + ) => EventMap[K] extends Event ? void : never, + options?: AddEventListenerOptions | boolean, + ): void; + + addEventListener( + type: string, + callback: EventListenerOrEventListenerObject | null, + options?: AddEventListenerOptions | boolean, + ): void; + + removeEventListener( + type: K, + callback: ( + event: EventMap[K] extends Event ? EventMap[K] : never, + ) => EventMap[K] extends Event ? void : never, + options?: EventListenerOptions | boolean, + ): void; + + removeEventListener( + type: string, + callback: EventListenerOrEventListenerObject | null, + options?: EventListenerOptions | boolean, + ): void; +} + +export type ConnectionEvent = Record; + +export type ConnectionErrorEvent = { + error: Error; +}; + +export type ConnectionReconnectingEvent = { + delay: number; +}; + +export type ConnectionEventMap = { + error: CustomEvent; + connect: CustomEvent; + reconnecting: CustomEvent; + ready: CustomEvent; + close: CustomEvent; + end: CustomEvent; +}; + +export type ConnectionEventTarget = TypedEventTarget; + +export type ConnectionEventType = + | "error" + | "connect" + | "reconnecting" + | "ready" + | "close" + | "end"; + +export type ConnectionEventArg = T extends + "error" ? Error + : T extends "reconnecting" ? number + : undefined; + export interface SendCommandOptions { /** * When this option is set, simple or bulk string replies are returned as `Uint8Array` type. @@ -28,7 +96,8 @@ export interface SendCommandOptions { inline?: boolean; } -export interface Connection { +export interface Connection extends EventTarget { + name: string | null; isClosed: boolean; isConnected: boolean; close(): void; @@ -89,7 +158,8 @@ interface PendingCommand { reject: (error: unknown) => void; } -export class RedisConnection implements Connection { +export class RedisConnection extends (EventTarget as ConnectionEventTarget) + implements Connection { name: string | null = null; private maxRetryCount = 10; @@ -120,6 +190,8 @@ export class RedisConnection implements Connection { port: number | string, private options: RedisConnectionOptions, ) { + super(); + this.hostname = hostname; this.port = port; if (options.name) { @@ -141,10 +213,13 @@ export class RedisConnection implements Connection { : await this.sendCommand("AUTH", [password], { inline: true }); } catch (error) { if (error instanceof ErrorReplyError) { - throw new AuthenticationError("Authentication failed", { + const authError = new AuthenticationError("Authentication failed", { cause: error, }); + this.fireEvent("error", authError); + throw authError; } else { + this.fireEvent("error", error as Error); throw error; } } @@ -190,7 +265,7 @@ export class RedisConnection implements Connection { return this.#protocol.readReply(returnsUint8Arrays); } - [kUnstablePipeline](commands: Array) { + [kUnstablePipeline](commands: Array): Promise { const { promise, resolve, reject } = Promise.withResolvers< RedisReply[] >(); @@ -223,8 +298,10 @@ export class RedisConnection implements Connection { this.#conn = conn; this.#protocol = this.options?.[kUnstableCreateProtocol]?.(conn) ?? new DenoStreamsProtocol(conn); + this._isClosed = false; this._isConnected = true; + this.fireEvent("connect", undefined); try { if (this.options.password != null) { @@ -234,33 +311,57 @@ export class RedisConnection implements Connection { await this.selectDb(this.options.db); } } catch (error) { - this.close(); + this.#close(); throw error; } + this.fireEvent("ready", undefined); + this.#enableHealthCheckIfNeeded(); } catch (error) { if (error instanceof AuthenticationError) { + this.fireEvent("error", error); + this.fireEvent("end", undefined); throw (error.cause ?? error); } const backoff = this.backoff(retryCount); retryCount++; if (retryCount >= this.maxRetryCount) { + this.fireEvent("error", error as Error); + this.fireEvent("end", undefined); throw error; } + this.fireEvent("reconnecting", backoff); await delay(backoff); await this.#connect(retryCount); } } close() { + this.#close(false); + } + + #close(canReconnect = false) { + const isClosedAlready = this._isClosed; + this._isClosed = true; this._isConnected = false; try { this.#conn!.close(); } catch (error) { - if (!(error instanceof Deno.errors.BadResource)) throw error; + if (!(error instanceof Deno.errors.BadResource)) { + this.fireEvent("error", error as Error); + throw error; + } + } finally { + if (!isClosedAlready) { + this.fireEvent("close", undefined); + + if (!canReconnect) { + this.fireEvent("end", undefined); + } + } } } @@ -268,8 +369,9 @@ export class RedisConnection implements Connection { try { await this.sendCommand("PING"); this._isConnected = true; - } catch (_error) { // TODO: Maybe we should log this error. - this.close(); + } catch (error) { // TODO: Maybe we should log this error. + this.fireEvent("error", error as Error); + this.#close(true); await this.connect(); await this.sendCommand("PING"); } @@ -287,12 +389,13 @@ export class RedisConnection implements Connection { !isRetriableError(error) || this.isManuallyClosedByUser() ) { + this.fireEvent("error", error as Error); return command.reject(error); } for (let i = 0; i < this.maxRetryCount; i++) { // Try to reconnect to the server and retry the command - this.close(); + this.#close(true); try { await this.connect(); const reply = await command.execute(); @@ -303,6 +406,7 @@ export class RedisConnection implements Connection { } } + this.fireEvent("error", error as Error); command.reject(error); } finally { this.commandQueue.shift(); @@ -328,7 +432,7 @@ export class RedisConnection implements Connection { try { await this.sendCommand("PING"); this._isConnected = true; - } catch { + } catch (_error) { // TODO: notify the user of an error this._isConnected = false; } finally { @@ -338,6 +442,14 @@ export class RedisConnection implements Connection { setTimeout(ping, healthCheckInterval); } + + private fireEvent( + eventType: T, + eventArg: ConnectionEventArg, + ): boolean { + const event = new CustomEvent(eventType, { detail: eventArg }); + return this.dispatchEvent(event); + } } class AuthenticationError extends Error {} diff --git a/mod.ts b/mod.ts index 2283cb8f..51e6f205 100644 --- a/mod.ts +++ b/mod.ts @@ -58,6 +58,13 @@ export type { } from "./command.ts"; export type { Connection, + ConnectionErrorEvent, + ConnectionEvent, + ConnectionEventArg, + ConnectionEventMap, + ConnectionEventTarget, + ConnectionEventType, + ConnectionReconnectingEvent, RedisConnectionOptions, SendCommandOptions, } from "./connection.ts"; diff --git a/redis.ts b/redis.ts index 41089efe..a76d73cc 100644 --- a/redis.ts +++ b/redis.ts @@ -44,7 +44,13 @@ import type { ZUnionstoreOpts, } from "./command.ts"; import { RedisConnection } from "./connection.ts"; -import type { Connection, SendCommandOptions } from "./connection.ts"; +import type { + Connection, + ConnectionEventMap, + ConnectionEventTarget, + ConnectionEventType, + SendCommandOptions, +} from "./connection.ts"; import type { RedisConnectionOptions } from "./connection.ts"; import type { CommandExecutor } from "./executor.ts"; import { DefaultExecutor } from "./executor.ts"; @@ -104,7 +110,7 @@ const binaryCommandOptions = { returnUint8Arrays: true, }; -export interface Redis extends RedisCommands { +export interface Redis extends RedisCommands, EventTarget { readonly isClosed: boolean; readonly isConnected: boolean; @@ -118,10 +124,24 @@ export interface Redis extends RedisCommands { ): Promise; connect(): Promise; close(): void; + [Symbol.dispose](): void; } -class RedisImpl implements Redis { +interface TypedEventListener { + (evt: E): void; +} + +interface TypedEventListenerObject { + handleEvent(evt: E): void; +} + +type TypedEventListenerOrEventListenerObject = + | TypedEventListener + | TypedEventListenerObject; + +class RedisImpl extends (EventTarget as ConnectionEventTarget) + implements Redis { private readonly executor: CommandExecutor; get isClosed() { @@ -133,9 +153,34 @@ class RedisImpl implements Redis { } constructor(executor: CommandExecutor) { + super(); this.executor = executor; } + override addEventListener( + type: K, + callback: + | TypedEventListenerOrEventListenerObject + | null, + options?: boolean | AddEventListenerOptions, + ): void { + const listener = callback as EventListenerOrEventListenerObject | null; + this.executor.connection.addEventListener(type, listener, options); + super.addEventListener(type, listener, options); + } + + override removeEventListener( + type: K, + callback: + | TypedEventListenerOrEventListenerObject + | null, + options?: boolean | EventListenerOptions, + ): void { + const listener = callback as EventListenerOrEventListenerObject | null; + this.executor.connection.removeEventListener(type, listener, options); + super.removeEventListener(type, listener, options); + } + sendCommand( command: string, args?: RedisValue[], diff --git a/tests/commands/connection.ts b/tests/commands/connection.ts index 365be6bb..87749fba 100644 --- a/tests/commands/connection.ts +++ b/tests/commands/connection.ts @@ -129,6 +129,67 @@ export function connectionTests( client.close(); }); + + it("fires events", async () => { + const client = await connect(getOpts()); + + let closeEventFired = false, + endEventFired = false; + + client.addEventListener("close", () => { + closeEventFired = true; + }); + client.addEventListener("end", () => { + endEventFired = true; + }); + + client.close(); + + assertEquals(closeEventFired, true); + assertEquals(endEventFired, true); + }); + + it("fires events with a lazy client", async () => { + const client = createLazyClient(getOpts()); + + let connectEventFired = false, + connectEventFiredTimes = 0, + readyEventFired = false, + readyEventFiredTimes = 0, + closeEventFired = false, + endEventFired = false; + + client.addEventListener("connect", () => { + connectEventFired = true; + connectEventFiredTimes++; + }); + client.addEventListener("ready", () => { + readyEventFired = true; + readyEventFiredTimes++; + }, { once: true }); + + client.addEventListener("close", () => { + closeEventFired = true; + }); + client.addEventListener("end", () => { + endEventFired = true; + }); + + await client.exists("foo"); + client.close(); + + await client.connect(); + await client.exists("foo"); + client.close(); + + assertEquals(connectEventFired, true); + assertEquals(readyEventFired, true); + assertEquals(closeEventFired, true); + assertEquals(endEventFired, true); + + assertEquals(connectEventFiredTimes, 2); + assertEquals(readyEventFiredTimes, 1); + }); }); describe("using", () => {