Skip to content

Commit

Permalink
BREAKING: remove reader and writer from Connection
Browse files Browse the repository at this point in the history
  • Loading branch information
uki00a committed Oct 18, 2023
1 parent b0540e8 commit b9cabb3
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 19 deletions.
29 changes: 23 additions & 6 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { sendCommand } from "./protocol/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/mod.ts";
import { readReply, sendCommand, sendCommands } from "./protocol/mod.ts";
import type { Command, RedisReply, RedisValue } from "./protocol/mod.ts";
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError, isRetriableError } from "./errors.ts";
import { kUnstablePipeline, kUnstableReadReply } from "./internal/symbols.ts";
import { BufReader } from "./vendor/https/deno.land/std/io/buf_reader.ts";
import { BufWriter } from "./vendor/https/deno.land/std/io/buf_writer.ts";
import {
Expand All @@ -22,8 +23,6 @@ export interface SendCommandOptions {
}

export interface Connection {
reader: BufReader;
writer: BufWriter;
isClosed: boolean;
isConnected: boolean;
close(): void;
Expand All @@ -34,6 +33,16 @@ export interface Connection {
args?: Array<RedisValue>,
options?: SendCommandOptions,
): Promise<RedisReply>;
/**
* @private
*/
[kUnstableReadReply](returnsUint8Arrays?: boolean): Promise<RedisReply>;
/**
* @private
*/
[kUnstablePipeline](
commands: Array<Command>,
): Promise<Array<RedisReply | ErrorReplyError>>;
}

export interface RedisConnectionOptions {
Expand Down Expand Up @@ -64,8 +73,8 @@ interface Command {

export class RedisConnection implements Connection {
name: string | null = null;
reader!: BufReader;
writer!: BufWriter;
private reader!: BufReader;
private writer!: BufWriter;
private closer!: Closer;
private maxRetryCount = 10;

Expand Down Expand Up @@ -149,6 +158,14 @@ export class RedisConnection implements Connection {
return promise;
}

[kUnstableReadReply](returnsUint8Arrays?: boolean): Promise<RedisReply> {
return readReply(this.reader, returnsUint8Arrays);
}

[kUnstablePipeline](commands: Array<Command>) {
return sendCommands(this.writer, this.reader, commands);
}

/**
* Connect to Redis server
*/
Expand Down
9 changes: 9 additions & 0 deletions internal/symbols.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* @private
*/
export const kUnstableReadReply = Symbol("deno-redis.readReply");

/**
* @private
*/
export const kUnstablePipeline = Symbol("deno-redis.pipeline");
3 changes: 2 additions & 1 deletion pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
sendCommands,
} from "./protocol/mod.ts";
import { create, Redis } from "./redis.ts";
import { kUnstablePipeline } from "./internal/symbols.ts";
import {
Deferred,
deferred,
Expand Down Expand Up @@ -92,7 +93,7 @@ export class PipelineExecutor implements CommandExecutor {
private dequeue(): void {
const [e] = this.queue;
if (!e) return;
sendCommands(this.connection.writer, this.connection.reader, e.commands)
this.connection[kUnstablePipeline](e.commands)
.then(e.d.resolve)
.catch(e.d.reject)
.finally(() => {
Expand Down
16 changes: 9 additions & 7 deletions protocol/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,22 @@ export async function sendCommand(
return readReply(reader, returnUint8Arrays);
}

export interface Command {
command: string;
args: RedisValue[];
returnUint8Arrays?: boolean;
}

export async function sendCommands(
writer: BufWriter,
reader: BufReader,
commands: {
command: string;
args: RedisValue[];
returnUint8Arrays?: boolean;
}[],
): Promise<unknown[]> {
commands: Command[],
): Promise<(RedisReply | ErrorReplyError)[]> {
for (const { command, args } of commands) {
await writeRequest(writer, command, args);
}
await writer.flush();
const ret: unknown[] = [];
const ret: (RedisReply | ErrorReplyError)[] = [];
for (let i = 0; i < commands.length; i++) {
try {
const rep = await readReply(reader, commands[i].returnUint8Arrays);
Expand Down
1 change: 1 addition & 0 deletions protocol/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ export type {

export { okReply, readArrayReply, readReply } from "./reply.ts";

export type { Command } from "./command.ts";
export { sendCommand, sendCommands } from "./command.ts";
7 changes: 2 additions & 5 deletions pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { InvalidStateError } from "./errors.ts";
import type { Binary } from "./protocol/mod.ts";
import { readArrayReply } from "./protocol/mod.ts";
import { decoder } from "./protocol/_util.ts";
import { kUnstableReadReply } from "./internal/symbols.ts";

type DefaultMessageType = string;
type ValidMessageType = string | string[];
Expand Down Expand Up @@ -96,11 +97,7 @@ class RedisSubscriptionImpl<
T,
];
try {
// TODO: `readArrayReply` should not be called directly here
rep = (await readArrayReply(
connection.reader,
binaryMode,
)) as typeof rep;
rep = await connection[kUnstableReadReply](binaryMode) as typeof rep;
} catch (err) {
if (err instanceof Deno.errors.BadResource) {
// Connection already closed.
Expand Down

0 comments on commit b9cabb3

Please sign in to comment.