Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement monitor command #483

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import type {
XReadOpts,
XReadReply,
} from "./stream.ts";
import type { RedisMonitor } from "./monitor.ts";

export type ACLLogMode = "RESET";
type BitopOperation = "AND" | "OR" | "XOR" | "NOT";
Expand Down Expand Up @@ -1287,7 +1288,7 @@ XRANGE somestream - +
moduleList(): Promise<BulkString[]>;
moduleLoad(path: string, ...args: string[]): Promise<SimpleString>;
moduleUnload(name: string): Promise<SimpleString>;
monitor(): void;
monitor(): Promise<RedisMonitor>;
replicaof(host: string, port: number): Promise<SimpleString>;
replicaofNoOne(): Promise<SimpleString>;
role(): Promise<RoleReply>;
Expand Down
5 changes: 5 additions & 0 deletions connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface Connection extends TypedEventTarget<ConnectionEventMap> {
close(): void;
connect(): Promise<void>;
reconnect(): Promise<void>;
duplicate(): RedisConnection;
sendCommand(
command: string,
args?: Array<RedisValue>,
Expand Down Expand Up @@ -139,6 +140,10 @@ export class RedisConnection
this.backoff = options.backoff ?? exponentialBackoff();
}

duplicate(): RedisConnection {
return new RedisConnection(this.hostname, this.port, this.options);
}

private async authenticate(
username: string | undefined,
password: string,
Expand Down
1 change: 1 addition & 0 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export type {
SimpleString,
} from "./protocol/shared/types.ts";
export type { RedisPubSubMessage, RedisSubscription } from "./pubsub.ts";
export type { RedisMonitor, RedisMonitorLog } from "./monitor.ts";
export type { Redis, RedisConnectOptions } from "./redis.ts";
export type {
StartEndCount,
Expand Down
83 changes: 83 additions & 0 deletions monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import type { CommandExecutor } from "./executor.ts";
import { isRetriableError } from "./errors.ts";
import { kUnstableReadReply } from "./internal/symbols.ts";

export interface RedisMonitorLog {
timestamp: string;
args: string[];
source: string;
database: string;
}

export interface RedisMonitor {
readonly isConnected: boolean;
readonly isClosed: boolean;
receive(): AsyncIterableIterator<RedisMonitorLog>;
close(): void;
}

export class RedisMonitorImpl implements RedisMonitor {
get isConnected(): boolean {
return this.executor.connection.isConnected;
}

get isClosed(): boolean {
return this.executor.connection.isClosed;
}

constructor(private executor: CommandExecutor) {}

receive(): AsyncIterableIterator<RedisMonitorLog> {
return this.#receive();
}

/**
* Non-standard return value. Dumps the received commands in an infinite flow.
* @see https://redis.io/docs/latest/commands/monitor
*/
async *#receive(): AsyncIterableIterator<RedisMonitorLog> {
let forceReconnect = false;
const connection = this.executor.connection;
while (this.isConnected) {
try {
let reply: string;
try {
reply = await connection[kUnstableReadReply]() as typeof reply;
} catch (err) {
if (this.isClosed) {
// Connection already closed by the user.
break;
}
throw err; // Connection may have been unintentionally closed.
}

// Reply example: 1735135615.9063666 [0 127.0.0.1:52848] "XRANGE" "foo" "-" "+" "COUNT" "3"
const len = reply.indexOf(" ");
const timestamp = reply.slice(0, len);
const argIndex = reply.indexOf('"');
const args = reply
.slice(argIndex + 1, -1)
.split('" "')
.map((elem) => elem.replace(/\\"/g, '"'));
const [database, source] = reply.slice(len + 2, argIndex - 2).split(
" ",
);

yield { timestamp, args, source, database };
} catch (error) {
if (isRetriableError(error)) {
forceReconnect = true;
} else throw error;
} finally {
if ((!this.isClosed && !this.isConnected) || forceReconnect) {
forceReconnect = false;
await connection.reconnect();
}
}
}
}

close() {
this.executor.connection.close();
}
}
10 changes: 8 additions & 2 deletions redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import {
rawstr,
xidstr,
} from "./stream.ts";
import { RedisMonitorImpl } from "./monitor.ts";

const binaryCommandOptions = {
returnUint8Arrays: true,
Expand Down Expand Up @@ -1116,8 +1117,13 @@ class RedisImpl implements Redis {
return this.execStatusReply("MODULE", "UNLOAD", name);
}

monitor() {
throw new Error("not supported yet");
async monitor() {
const connection = this.executor.connection.duplicate();
Copy link
Member

@uki00a uki00a Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better not to duplicate the connection here for the following reasons:

  1. to keep consistency with subscribe()
  2. node-redis also works that way

What do you think about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been using ioredis historically (which also seems to be 1.5+ times more popular than node-redis on npm).

Unlike node-redis, it does duplicate the connection by default on monitor() which I think was rational because all other commands become unavailable then.

The only positive side of using a single connection I can see is to cover the case when one would want to create redis connection only to perform monitoring and nothing else.

But this comes with a cost, making it dangerous, as it easily leads to unexpected behavior, blocking execution of all other redis commands. For those who are not experienced with redis much or for those coming from ioredis, it would become unclear, why some redis commands just get stuck and take forever to execute; in a large codebase it would be especially challenging to debug. It's also unclear whether the commands you call are accumulated and executed once you terminate the monitor (which can only be done manually), or they are just ignored entirely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would advocate for the duplication of connection as it make it simpler and doesn't "break all things", as well as eliminates potential issues and unforeseen behavior.

That said, what about brining an argument (option) to the monitor() that could make it use the same connection?
To cover the case when one needs to have redis only for monitoring and doesn't need a regular working redis instance to use the rest of the API.

Copy link
Member

@uki00a uki00a Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArturSharapov I'm in favor of adding an option to the monitor() 👍

We are planning to support RESP3 in the future, and it would be ideal to have an option of not duplicating a connection to monitor().

I have two suggestions:

  • Add reuseConnection option to monitor().
  • Make Connection#duplicate a private API for now
    • I'll implement a connection pool in this library and would like to keep Connection#duplicate private until then
    • e.g.)
      [kUnstableReadReply](returnsUint8Arrays?: boolean): Promise<RedisReply>;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call 👍

await connection.connect();
const executor = new DefaultExecutor(connection);
await executor.sendCommand("MONITOR", [], { inline: true });

return new RedisMonitorImpl(executor);
}

move(key: string, db: string) {
Expand Down
Loading