Skip to content

Commit

Permalink
chore: implement DenoBasedProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
uki00a committed Oct 24, 2023
1 parent d8c2934 commit 13d3f6b
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 35 deletions.
38 changes: 11 additions & 27 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
import {
readReply,
sendCommand,
sendCommands,
} from "./protocol/deno_streams/mod.ts";
import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
import type { Command } from "./protocol/deno_streams/command.ts";
import type { Command, Protocol } from "./protocol/shared/protocol.ts";
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError, isRetriableError } from "./errors.ts";
import { kUnstablePipeline, kUnstableReadReply } from "./internal/symbols.ts";
import { BufReader } from "./vendor/https/deno.land/std/io/buf_reader.ts";
import { BufWriter } from "./vendor/https/deno.land/std/io/buf_writer.ts";
import {
Deferred,
deferred,
} from "./vendor/https/deno.land/std/async/deferred.ts";
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";
type Closer = Deno.Closer;

export interface SendCommandOptions {
/**
Expand Down Expand Up @@ -78,9 +71,6 @@ interface PendingCommand {

export class RedisConnection implements Connection {
name: string | null = null;
private reader!: BufReader;
private writer!: BufWriter;
private closer!: Closer;
private maxRetryCount = 10;

private readonly hostname: string;
Expand All @@ -90,6 +80,8 @@ export class RedisConnection implements Connection {
private backoff: Backoff;

private commandQueue: PendingCommand[] = [];
#conn!: Deno.Conn;
#protocol!: Protocol;

get isClosed(): boolean {
return this._isClosed;
Expand Down Expand Up @@ -164,11 +156,11 @@ export class RedisConnection implements Connection {
}

[kUnstableReadReply](returnsUint8Arrays?: boolean): Promise<RedisReply> {
return readReply(this.reader, returnsUint8Arrays);
return this.#protocol.readReply(returnsUint8Arrays);
}

[kUnstablePipeline](commands: Array<Command>) {
return sendCommands(this.writer, this.reader, commands);
return this.#protocol.pipeline(commands);
}

/**
Expand All @@ -188,9 +180,8 @@ export class RedisConnection implements Connection {
? await Deno.connectTls(dialOpts)
: await Deno.connect(dialOpts);

this.closer = conn;
this.reader = new BufReader(conn);
this.writer = new BufWriter(conn);
this.#conn = conn;
this.#protocol = new DenoStreamsProtocol(conn);
this._isClosed = false;
this._isConnected = true;

Expand Down Expand Up @@ -226,16 +217,13 @@ export class RedisConnection implements Connection {
this._isClosed = true;
this._isConnected = false;
try {
this.closer!.close();
this.#conn!.close();
} catch (error) {
if (!(error instanceof Deno.errors.BadResource)) throw error;
}
}

async reconnect(): Promise<void> {
if (!this.reader.peek(1)) {
throw new Error("Client is closed.");
}
try {
await this.sendCommand("PING");
this._isConnected = true;
Expand All @@ -251,9 +239,7 @@ export class RedisConnection implements Connection {
if (!command) return;

try {
const reply = await sendCommand(
this.writer,
this.reader,
const reply = await this.#protocol.sendCommand(
command.name,
command.args,
command.returnUint8Arrays,
Expand All @@ -273,9 +259,7 @@ export class RedisConnection implements Connection {
try {
await this.connect();

const reply = await sendCommand(
this.writer,
this.reader,
const reply = await this.#protocol.sendCommand(
command.name,
command.args,
command.returnUint8Arrays,
Expand Down
5 changes: 5 additions & 0 deletions internal/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ export const kUnstableReadReply = Symbol("deno-redis.readReply");
* @private
*/
export const kUnstablePipeline = Symbol("deno-redis.pipeline");

/**
* @private
*/
export const kUnstableProtocol = Symbol("deno-redis.protocol");
7 changes: 1 addition & 6 deletions protocol/deno_streams/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { readReply } from "./reply.ts";
import { ErrorReplyError } from "../../errors.ts";
import { encoder } from "../../internal/encoding.ts";
import type { RedisReply, RedisValue } from "../shared/types.ts";
import type { Command } from "../shared/protocol.ts";

const CRLF = encoder.encode("\r\n");
const ArrayCode = encoder.encode("*");
Expand Down Expand Up @@ -98,12 +99,6 @@ export async function sendCommand(
return readReply(reader, returnUint8Arrays);
}

export interface Command {
command: string;
args: RedisValue[];
returnUint8Arrays?: boolean;
}

export async function sendCommands(
writer: BufWriter,
reader: BufReader,
Expand Down
41 changes: 39 additions & 2 deletions protocol/deno_streams/mod.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,40 @@
export { readArrayReply, readReply } from "./reply.ts";
import { BufReader } from "../../vendor/https/deno.land/std/io/buf_reader.ts";
import { BufWriter } from "../../vendor/https/deno.land/std/io/buf_writer.ts";
import { readReply } from "./reply.ts";
import { sendCommand, sendCommands } from "./command.ts";

export { sendCommand, sendCommands } from "./command.ts";
import type { Command, Protocol as BaseProtocol } from "../shared/protocol.ts";
import { RedisReply, RedisValue } from "../shared/types.ts";
import { ErrorReplyError } from "../../errors.ts";

export class Protocol implements BaseProtocol {
#reader: BufReader;
#writer: BufWriter;

constructor(conn: Deno.Conn) {
this.#reader = new BufReader(conn);
this.#writer = new BufWriter(conn);
}

sendCommand(
command: string,
args: RedisValue[],
returnsUint8Arrays?: boolean | undefined,
): Promise<RedisReply> {
return sendCommand(
this.#writer,
this.#reader,
command,
args,
returnsUint8Arrays,
);
}

readReply(returnsUint8Arrays?: boolean): Promise<RedisReply> {
return readReply(this.#reader, returnsUint8Arrays);
}

pipeline(commands: Command[]): Promise<Array<RedisReply | ErrorReplyError>> {
return sendCommands(this.#writer, this.#reader, commands);
}
}
20 changes: 20 additions & 0 deletions protocol/shared/protocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import type { RedisReply, RedisValue } from "./types.ts";
import type { ErrorReplyError } from "../../errors.ts";

export interface Command {
command: string;
args: RedisValue[];
returnUint8Arrays?: boolean;
}

export interface Protocol {
sendCommand(
command: string,
args: Array<RedisValue>,
returnsUint8Arrays?: boolean,
): Promise<RedisReply>;
readReply(returnsUint8Array?: boolean): Promise<RedisReply>;
pipeline(
commands: Array<Command>,
): Promise<Array<RedisReply | ErrorReplyError>>;
}

0 comments on commit 13d3f6b

Please sign in to comment.