Skip to content

Commit

Permalink
feat: add connection event listener support similar to node-redis (#475)
Browse files Browse the repository at this point in the history
  • Loading branch information
ardabeyazoglu authored Dec 8, 2024
1 parent fbf2a23 commit 31e34e2
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 13 deletions.
132 changes: 122 additions & 10 deletions connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,74 @@ import type { Command, Protocol } from "./protocol/shared/protocol.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
import { delay } from "./deps/std/async.ts";

type TypedEventTarget<EventMap extends object> = {
new (): IntermediateEventTarget<EventMap>;
};

interface IntermediateEventTarget<EventMap> extends EventTarget {
addEventListener<K extends keyof EventMap>(
type: K,
callback: (
event: EventMap[K] extends Event ? EventMap[K] : never,
) => EventMap[K] extends Event ? void : never,
options?: AddEventListenerOptions | boolean,
): void;

addEventListener(
type: string,
callback: EventListenerOrEventListenerObject | null,
options?: AddEventListenerOptions | boolean,
): void;

removeEventListener<K extends keyof EventMap>(
type: K,
callback: (
event: EventMap[K] extends Event ? EventMap[K] : never,
) => EventMap[K] extends Event ? void : never,
options?: EventListenerOptions | boolean,
): void;

removeEventListener(
type: string,
callback: EventListenerOrEventListenerObject | null,
options?: EventListenerOptions | boolean,
): void;
}

export type ConnectionEvent = Record<string, unknown>;

export type ConnectionErrorEvent = {
error: Error;
};

export type ConnectionReconnectingEvent = {
delay: number;
};

export type ConnectionEventMap = {
error: CustomEvent<ConnectionErrorEvent>;
connect: CustomEvent<ConnectionEvent>;
reconnecting: CustomEvent<ConnectionReconnectingEvent>;
ready: CustomEvent<ConnectionEvent>;
close: CustomEvent<ConnectionEvent>;
end: CustomEvent<ConnectionEvent>;
};

export type ConnectionEventTarget = TypedEventTarget<ConnectionEventMap>;

export type ConnectionEventType =
| "error"
| "connect"
| "reconnecting"
| "ready"
| "close"
| "end";

export type ConnectionEventArg<T extends ConnectionEventType> = T extends
"error" ? Error
: T extends "reconnecting" ? number
: undefined;

export interface SendCommandOptions {
/**
* When this option is set, simple or bulk string replies are returned as `Uint8Array` type.
Expand All @@ -28,7 +96,8 @@ export interface SendCommandOptions {
inline?: boolean;
}

export interface Connection {
export interface Connection extends EventTarget {
name: string | null;
isClosed: boolean;
isConnected: boolean;
close(): void;
Expand Down Expand Up @@ -89,7 +158,8 @@ interface PendingCommand {
reject: (error: unknown) => void;
}

export class RedisConnection implements Connection {
export class RedisConnection extends (EventTarget as ConnectionEventTarget)
implements Connection {
name: string | null = null;
private maxRetryCount = 10;

Expand Down Expand Up @@ -120,6 +190,8 @@ export class RedisConnection implements Connection {
port: number | string,
private options: RedisConnectionOptions,
) {
super();

this.hostname = hostname;
this.port = port;
if (options.name) {
Expand All @@ -141,10 +213,13 @@ export class RedisConnection implements Connection {
: await this.sendCommand("AUTH", [password], { inline: true });
} catch (error) {
if (error instanceof ErrorReplyError) {
throw new AuthenticationError("Authentication failed", {
const authError = new AuthenticationError("Authentication failed", {
cause: error,
});
this.fireEvent("error", authError);
throw authError;
} else {
this.fireEvent("error", error as Error);
throw error;
}
}
Expand Down Expand Up @@ -190,7 +265,7 @@ export class RedisConnection implements Connection {
return this.#protocol.readReply(returnsUint8Arrays);
}

[kUnstablePipeline](commands: Array<Command>) {
[kUnstablePipeline](commands: Array<Command>): Promise<RedisReply[]> {
const { promise, resolve, reject } = Promise.withResolvers<
RedisReply[]
>();
Expand Down Expand Up @@ -223,8 +298,10 @@ export class RedisConnection implements Connection {
this.#conn = conn;
this.#protocol = this.options?.[kUnstableCreateProtocol]?.(conn) ??
new DenoStreamsProtocol(conn);

this._isClosed = false;
this._isConnected = true;
this.fireEvent("connect", undefined);

try {
if (this.options.password != null) {
Expand All @@ -234,42 +311,67 @@ export class RedisConnection implements Connection {
await this.selectDb(this.options.db);
}
} catch (error) {
this.close();
this.#close();
throw error;
}

this.fireEvent("ready", undefined);

this.#enableHealthCheckIfNeeded();
} catch (error) {
if (error instanceof AuthenticationError) {
this.fireEvent("error", error);
this.fireEvent("end", undefined);
throw (error.cause ?? error);
}

const backoff = this.backoff(retryCount);
retryCount++;
if (retryCount >= this.maxRetryCount) {
this.fireEvent("error", error as Error);
this.fireEvent("end", undefined);
throw error;
}
this.fireEvent("reconnecting", backoff);
await delay(backoff);
await this.#connect(retryCount);
}
}

close() {
this.#close(false);
}

#close(canReconnect = false) {
const isClosedAlready = this._isClosed;

this._isClosed = true;
this._isConnected = false;
try {
this.#conn!.close();
} catch (error) {
if (!(error instanceof Deno.errors.BadResource)) throw error;
if (!(error instanceof Deno.errors.BadResource)) {
this.fireEvent("error", error as Error);
throw error;
}
} finally {
if (!isClosedAlready) {
this.fireEvent("close", undefined);

if (!canReconnect) {
this.fireEvent("end", undefined);
}
}
}
}

async reconnect(): Promise<void> {
try {
await this.sendCommand("PING");
this._isConnected = true;
} catch (_error) { // TODO: Maybe we should log this error.
this.close();
} catch (error) { // TODO: Maybe we should log this error.
this.fireEvent("error", error as Error);
this.#close(true);
await this.connect();
await this.sendCommand("PING");
}
Expand All @@ -287,12 +389,13 @@ export class RedisConnection implements Connection {
!isRetriableError(error) ||
this.isManuallyClosedByUser()
) {
this.fireEvent("error", error as Error);
return command.reject(error);
}

for (let i = 0; i < this.maxRetryCount; i++) {
// Try to reconnect to the server and retry the command
this.close();
this.#close(true);
try {
await this.connect();
const reply = await command.execute();
Expand All @@ -303,6 +406,7 @@ export class RedisConnection implements Connection {
}
}

this.fireEvent("error", error as Error);
command.reject(error);
} finally {
this.commandQueue.shift();
Expand All @@ -328,7 +432,7 @@ export class RedisConnection implements Connection {
try {
await this.sendCommand("PING");
this._isConnected = true;
} catch {
} catch (_error) {
// TODO: notify the user of an error
this._isConnected = false;
} finally {
Expand All @@ -338,6 +442,14 @@ export class RedisConnection implements Connection {

setTimeout(ping, healthCheckInterval);
}

private fireEvent<T extends ConnectionEventType>(
eventType: T,
eventArg: ConnectionEventArg<T>,
): boolean {
const event = new CustomEvent(eventType, { detail: eventArg });
return this.dispatchEvent(event);
}
}

class AuthenticationError extends Error {}
Expand Down
7 changes: 7 additions & 0 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ export type {
} from "./command.ts";
export type {
Connection,
ConnectionErrorEvent,
ConnectionEvent,
ConnectionEventArg,
ConnectionEventMap,
ConnectionEventTarget,
ConnectionEventType,
ConnectionReconnectingEvent,
RedisConnectionOptions,
SendCommandOptions,
} from "./connection.ts";
Expand Down
51 changes: 48 additions & 3 deletions redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ import type {
ZUnionstoreOpts,
} from "./command.ts";
import { RedisConnection } from "./connection.ts";
import type { Connection, SendCommandOptions } from "./connection.ts";
import type {
Connection,
ConnectionEventMap,
ConnectionEventTarget,
ConnectionEventType,
SendCommandOptions,
} from "./connection.ts";
import type { RedisConnectionOptions } from "./connection.ts";
import type { CommandExecutor } from "./executor.ts";
import { DefaultExecutor } from "./executor.ts";
Expand Down Expand Up @@ -104,7 +110,7 @@ const binaryCommandOptions = {
returnUint8Arrays: true,
};

export interface Redis extends RedisCommands {
export interface Redis extends RedisCommands, EventTarget {
readonly isClosed: boolean;
readonly isConnected: boolean;

Expand All @@ -118,10 +124,24 @@ export interface Redis extends RedisCommands {
): Promise<RedisReply>;
connect(): Promise<void>;
close(): void;

[Symbol.dispose](): void;
}

class RedisImpl implements Redis {
interface TypedEventListener<E extends Event> {
(evt: E): void;
}

interface TypedEventListenerObject<E extends Event> {
handleEvent(evt: E): void;
}

type TypedEventListenerOrEventListenerObject<E extends Event> =
| TypedEventListener<E>
| TypedEventListenerObject<E>;

class RedisImpl extends (EventTarget as ConnectionEventTarget)
implements Redis {
private readonly executor: CommandExecutor;

get isClosed() {
Expand All @@ -133,9 +153,34 @@ class RedisImpl implements Redis {
}

constructor(executor: CommandExecutor) {
super();
this.executor = executor;
}

override addEventListener<K extends ConnectionEventType>(
type: K,
callback:
| TypedEventListenerOrEventListenerObject<ConnectionEventMap[K]>
| null,
options?: boolean | AddEventListenerOptions,
): void {
const listener = callback as EventListenerOrEventListenerObject | null;
this.executor.connection.addEventListener(type, listener, options);
super.addEventListener(type, listener, options);
}

override removeEventListener<K extends ConnectionEventType>(
type: K,
callback:
| TypedEventListenerOrEventListenerObject<ConnectionEventMap[K]>
| null,
options?: boolean | EventListenerOptions,
): void {
const listener = callback as EventListenerOrEventListenerObject | null;
this.executor.connection.removeEventListener(type, listener, options);
super.removeEventListener(type, listener, options);
}

sendCommand(
command: string,
args?: RedisValue[],
Expand Down
Loading

0 comments on commit 31e34e2

Please sign in to comment.