diff --git a/command.ts b/command.ts index 7565c87c..b0d48938 100644 --- a/command.ts +++ b/command.ts @@ -37,6 +37,7 @@ import type { XReadOpts, XReadReply, } from "./stream.ts"; +import type { RedisMonitor } from "./monitor.ts"; export type ACLLogMode = "RESET"; type BitopOperation = "AND" | "OR" | "XOR" | "NOT"; @@ -1287,7 +1288,7 @@ XRANGE somestream - + moduleList(): Promise; moduleLoad(path: string, ...args: string[]): Promise; moduleUnload(name: string): Promise; - monitor(): void; + monitor(): Promise; replicaof(host: string, port: number): Promise; replicaofNoOne(): Promise; role(): Promise; diff --git a/connection.ts b/connection.ts index 68b6b08d..2699f12b 100644 --- a/connection.ts +++ b/connection.ts @@ -40,6 +40,7 @@ export interface Connection extends TypedEventTarget { close(): void; connect(): Promise; reconnect(): Promise; + duplicate(): RedisConnection; sendCommand( command: string, args?: Array, @@ -139,6 +140,10 @@ export class RedisConnection this.backoff = options.backoff ?? exponentialBackoff(); } + duplicate(): RedisConnection { + return new RedisConnection(this.hostname, this.port, this.options); + } + private async authenticate( username: string | undefined, password: string, diff --git a/mod.ts b/mod.ts index 602c9697..1b2c1fd4 100644 --- a/mod.ts +++ b/mod.ts @@ -83,6 +83,7 @@ export type { SimpleString, } from "./protocol/shared/types.ts"; export type { RedisPubSubMessage, RedisSubscription } from "./pubsub.ts"; +export type { RedisMonitor, RedisMonitorLog } from "./monitor.ts"; export type { Redis, RedisConnectOptions } from "./redis.ts"; export type { StartEndCount, diff --git a/monitor.ts b/monitor.ts new file mode 100644 index 00000000..a20c444b --- /dev/null +++ b/monitor.ts @@ -0,0 +1,83 @@ +import type { CommandExecutor } from "./executor.ts"; +import { isRetriableError } from "./errors.ts"; +import { kUnstableReadReply } from "./internal/symbols.ts"; + +export interface RedisMonitorLog { + timestamp: string; + args: string[]; + source: string; + database: string; +} + +export interface RedisMonitor { + readonly isConnected: boolean; + readonly isClosed: boolean; + receive(): AsyncIterableIterator; + close(): void; +} + +export class RedisMonitorImpl implements RedisMonitor { + get isConnected(): boolean { + return this.executor.connection.isConnected; + } + + get isClosed(): boolean { + return this.executor.connection.isClosed; + } + + constructor(private executor: CommandExecutor) {} + + receive(): AsyncIterableIterator { + return this.#receive(); + } + + /** + * Non-standard return value. Dumps the received commands in an infinite flow. + * @see https://redis.io/docs/latest/commands/monitor + */ + async *#receive(): AsyncIterableIterator { + let forceReconnect = false; + const connection = this.executor.connection; + while (this.isConnected) { + try { + let reply: string; + try { + reply = await connection[kUnstableReadReply]() as typeof reply; + } catch (err) { + if (this.isClosed) { + // Connection already closed by the user. + break; + } + throw err; // Connection may have been unintentionally closed. + } + + // Reply example: 1735135615.9063666 [0 127.0.0.1:52848] "XRANGE" "foo" "-" "+" "COUNT" "3" + const len = reply.indexOf(" "); + const timestamp = reply.slice(0, len); + const argIndex = reply.indexOf('"'); + const args = reply + .slice(argIndex + 1, -1) + .split('" "') + .map((elem) => elem.replace(/\\"/g, '"')); + const [database, source] = reply.slice(len + 2, argIndex - 2).split( + " ", + ); + + yield { timestamp, args, source, database }; + } catch (error) { + if (isRetriableError(error)) { + forceReconnect = true; + } else throw error; + } finally { + if ((!this.isClosed && !this.isConnected) || forceReconnect) { + forceReconnect = false; + await connection.reconnect(); + } + } + } + } + + close() { + this.executor.connection.close(); + } +} diff --git a/redis.ts b/redis.ts index 64dba1d5..757b728f 100644 --- a/redis.ts +++ b/redis.ts @@ -104,6 +104,7 @@ import { rawstr, xidstr, } from "./stream.ts"; +import { RedisMonitorImpl } from "./monitor.ts"; const binaryCommandOptions = { returnUint8Arrays: true, @@ -1116,8 +1117,13 @@ class RedisImpl implements Redis { return this.execStatusReply("MODULE", "UNLOAD", name); } - monitor() { - throw new Error("not supported yet"); + async monitor() { + const connection = this.executor.connection.duplicate(); + await connection.connect(); + const executor = new DefaultExecutor(connection); + await executor.sendCommand("MONITOR", [], { inline: true }); + + return new RedisMonitorImpl(executor); } move(key: string, db: string) {