diff --git a/connection.ts b/connection.ts index e1297d3e..5bb56111 100644 --- a/connection.ts +++ b/connection.ts @@ -1,22 +1,15 @@ -import { - readReply, - sendCommand, - sendCommands, -} from "./protocol/deno_streams/mod.ts"; +import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts"; import type { RedisReply, RedisValue } from "./protocol/shared/types.ts"; -import type { Command } from "./protocol/deno_streams/command.ts"; +import type { Command, Protocol } from "./protocol/shared/protocol.ts"; import type { Backoff } from "./backoff.ts"; import { exponentialBackoff } from "./backoff.ts"; import { ErrorReplyError, isRetriableError } from "./errors.ts"; import { kUnstablePipeline, kUnstableReadReply } from "./internal/symbols.ts"; -import { BufReader } from "./vendor/https/deno.land/std/io/buf_reader.ts"; -import { BufWriter } from "./vendor/https/deno.land/std/io/buf_writer.ts"; import { Deferred, deferred, } from "./vendor/https/deno.land/std/async/deferred.ts"; import { delay } from "./vendor/https/deno.land/std/async/delay.ts"; -type Closer = Deno.Closer; export interface SendCommandOptions { /** @@ -78,9 +71,6 @@ interface PendingCommand { export class RedisConnection implements Connection { name: string | null = null; - private reader!: BufReader; - private writer!: BufWriter; - private closer!: Closer; private maxRetryCount = 10; private readonly hostname: string; @@ -90,6 +80,8 @@ export class RedisConnection implements Connection { private backoff: Backoff; private commandQueue: PendingCommand[] = []; + #conn!: Deno.Conn; + #protocol!: Protocol; get isClosed(): boolean { return this._isClosed; @@ -164,11 +156,11 @@ export class RedisConnection implements Connection { } [kUnstableReadReply](returnsUint8Arrays?: boolean): Promise { - return readReply(this.reader, returnsUint8Arrays); + return this.#protocol.readReply(returnsUint8Arrays); } [kUnstablePipeline](commands: Array) { - return sendCommands(this.writer, this.reader, commands); + return this.#protocol.pipeline(commands); } /** @@ -188,9 +180,8 @@ export class RedisConnection implements Connection { ? await Deno.connectTls(dialOpts) : await Deno.connect(dialOpts); - this.closer = conn; - this.reader = new BufReader(conn); - this.writer = new BufWriter(conn); + this.#conn = conn; + this.#protocol = new DenoStreamsProtocol(conn); this._isClosed = false; this._isConnected = true; @@ -226,16 +217,13 @@ export class RedisConnection implements Connection { this._isClosed = true; this._isConnected = false; try { - this.closer!.close(); + this.#conn!.close(); } catch (error) { if (!(error instanceof Deno.errors.BadResource)) throw error; } } async reconnect(): Promise { - if (!this.reader.peek(1)) { - throw new Error("Client is closed."); - } try { await this.sendCommand("PING"); this._isConnected = true; @@ -251,9 +239,7 @@ export class RedisConnection implements Connection { if (!command) return; try { - const reply = await sendCommand( - this.writer, - this.reader, + const reply = await this.#protocol.sendCommand( command.name, command.args, command.returnUint8Arrays, @@ -273,9 +259,7 @@ export class RedisConnection implements Connection { try { await this.connect(); - const reply = await sendCommand( - this.writer, - this.reader, + const reply = await this.#protocol.sendCommand( command.name, command.args, command.returnUint8Arrays, diff --git a/internal/symbols.ts b/internal/symbols.ts index 2c51f1ca..1ea4d7b3 100644 --- a/internal/symbols.ts +++ b/internal/symbols.ts @@ -7,3 +7,8 @@ export const kUnstableReadReply = Symbol("deno-redis.readReply"); * @private */ export const kUnstablePipeline = Symbol("deno-redis.pipeline"); + +/** + * @private + */ +export const kUnstableProtocol = Symbol("deno-redis.protocol"); diff --git a/protocol/deno_streams/command.ts b/protocol/deno_streams/command.ts index 4e74f9d0..d9646c97 100644 --- a/protocol/deno_streams/command.ts +++ b/protocol/deno_streams/command.ts @@ -4,6 +4,7 @@ import { readReply } from "./reply.ts"; import { ErrorReplyError } from "../../errors.ts"; import { encoder } from "../../internal/encoding.ts"; import type { RedisReply, RedisValue } from "../shared/types.ts"; +import type { Command } from "../shared/protocol.ts"; const CRLF = encoder.encode("\r\n"); const ArrayCode = encoder.encode("*"); @@ -98,12 +99,6 @@ export async function sendCommand( return readReply(reader, returnUint8Arrays); } -export interface Command { - command: string; - args: RedisValue[]; - returnUint8Arrays?: boolean; -} - export async function sendCommands( writer: BufWriter, reader: BufReader, diff --git a/protocol/deno_streams/mod.ts b/protocol/deno_streams/mod.ts index 1a9d9042..e9968636 100644 --- a/protocol/deno_streams/mod.ts +++ b/protocol/deno_streams/mod.ts @@ -1,3 +1,40 @@ -export { readArrayReply, readReply } from "./reply.ts"; +import { BufReader } from "../../vendor/https/deno.land/std/io/buf_reader.ts"; +import { BufWriter } from "../../vendor/https/deno.land/std/io/buf_writer.ts"; +import { readReply } from "./reply.ts"; +import { sendCommand, sendCommands } from "./command.ts"; -export { sendCommand, sendCommands } from "./command.ts"; +import type { Command, Protocol as BaseProtocol } from "../shared/protocol.ts"; +import { RedisReply, RedisValue } from "../shared/types.ts"; +import { ErrorReplyError } from "../../errors.ts"; + +export class Protocol implements BaseProtocol { + #reader: BufReader; + #writer: BufWriter; + + constructor(conn: Deno.Conn) { + this.#reader = new BufReader(conn); + this.#writer = new BufWriter(conn); + } + + sendCommand( + command: string, + args: RedisValue[], + returnsUint8Arrays?: boolean | undefined, + ): Promise { + return sendCommand( + this.#writer, + this.#reader, + command, + args, + returnsUint8Arrays, + ); + } + + readReply(returnsUint8Arrays?: boolean): Promise { + return readReply(this.#reader, returnsUint8Arrays); + } + + pipeline(commands: Command[]): Promise> { + return sendCommands(this.#writer, this.#reader, commands); + } +} diff --git a/protocol/shared/protocol.ts b/protocol/shared/protocol.ts new file mode 100644 index 00000000..7c203501 --- /dev/null +++ b/protocol/shared/protocol.ts @@ -0,0 +1,20 @@ +import type { RedisReply, RedisValue } from "./types.ts"; +import type { ErrorReplyError } from "../../errors.ts"; + +export interface Command { + command: string; + args: RedisValue[]; + returnUint8Arrays?: boolean; +} + +export interface Protocol { + sendCommand( + command: string, + args: Array, + returnsUint8Arrays?: boolean, + ): Promise; + readReply(returnsUint8Array?: boolean): Promise; + pipeline( + commands: Array, + ): Promise>; +}