Skip to content

Commit

Permalink
fix: fix concurrent pipeline invocation results (#429)
Browse files Browse the repository at this point in the history
Co-authored-by: yahiro <[email protected]>
  • Loading branch information
yahiro07 and yahiro07 authored Mar 16, 2024
1 parent 849c98e commit 28ead06
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 29 deletions.
58 changes: 29 additions & 29 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
import type { Command, Protocol } from "./protocol/shared/protocol.ts";
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError, isRetriableError } from "./errors.ts";
Expand All @@ -10,6 +7,9 @@ import {
kUnstableReadReply,
kUnstableWriteCommand,
} from "./internal/symbols.ts";
import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts";
import type { Command, Protocol } from "./protocol/shared/protocol.ts";
import type { RedisReply, RedisValue } from "./protocol/shared/types.ts";
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";

export interface SendCommandOptions {
Expand Down Expand Up @@ -73,11 +73,9 @@ export interface RedisConnectionOptions {
export const kEmptyRedisArgs: Array<RedisValue> = [];

interface PendingCommand {
name: string;
args: RedisValue[];
execute: () => Promise<RedisReply>;
resolve: (reply: RedisReply) => void;
reject: (error: unknown) => void;
returnUint8Arrays?: boolean;
}

export class RedisConnection implements Connection {
Expand Down Expand Up @@ -148,22 +146,29 @@ export class RedisConnection implements Connection {
await this.sendCommand("SELECT", [db]);
}

private enqueueCommand(
command: PendingCommand,
) {
this.commandQueue.push(command);
if (this.commandQueue.length === 1) {
this.processCommandQueue();
}
}

sendCommand(
command: string,
args?: Array<RedisValue>,
options?: SendCommandOptions,
): Promise<RedisReply> {
const { promise, resolve, reject } = Promise.withResolvers<RedisReply>();
this.commandQueue.push({
name: command,
args: args ?? kEmptyRedisArgs,
resolve,
reject,
returnUint8Arrays: options?.returnUint8Arrays,
});
if (this.commandQueue.length === 1) {
this.processCommandQueue();
}
const execute = () =>
this.#protocol.sendCommand(
command,
args ?? kEmptyRedisArgs,
options?.returnUint8Arrays,
);
this.enqueueCommand({ execute, resolve, reject });

return promise;
}

Expand All @@ -172,7 +177,12 @@ export class RedisConnection implements Connection {
}

[kUnstablePipeline](commands: Array<Command>) {
return this.#protocol.pipeline(commands);
const { promise, resolve, reject } = Promise.withResolvers<
RedisReply[]
>();
const execute = () => this.#protocol.pipeline(commands);
this.enqueueCommand({ execute, resolve, reject } as PendingCommand);
return promise;
}

[kUnstableWriteCommand](command: Command): Promise<void> {
Expand Down Expand Up @@ -256,11 +266,7 @@ export class RedisConnection implements Connection {
if (!command) return;

try {
const reply = await this.#protocol.sendCommand(
command.name,
command.args,
command.returnUint8Arrays,
);
const reply = await command.execute();
command.resolve(reply);
} catch (error) {
if (
Expand All @@ -275,13 +281,7 @@ export class RedisConnection implements Connection {
this.close();
try {
await this.connect();

const reply = await this.#protocol.sendCommand(
command.name,
command.args,
command.returnUint8Arrays,
);

const reply = await command.execute();
return command.resolve(reply);
} catch { // TODO: use `AggregateError`?
const backoff = this.backoff(i);
Expand Down
31 changes: 31 additions & 0 deletions tests/commands/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,37 @@ export function pipelineTests(
}
});

it("pipeline in concurrent, avoid redundant response mixup", async () => {
{
const opts = getOpts();
const client = await connect(opts);

const randomValues = new Array(10)
.fill(0)
.map(() => new Array(10).fill(0).map(() => Math.random().toString()));

for (let i = 0; i < 10; i++) {
const key = `list_${i}`;
const values = randomValues[i];
await client.del(key);
await client.rpush(key, ...values);
}

// deno-lint-ignore no-inner-declarations
async function task() {
const tx = client.pipeline();
for (let i = 0; i < 10; i++) {
tx.lrange(`list_${i}`, 0, -1);
}
return await tx.flush();
}

const res = await Promise.all([task(), task()]);
assertEquals(res, [randomValues, randomValues]);
client.close();
}
});

it("error while pipeline", async () => {
const opts = getOpts();
const client = await connect(opts);
Expand Down

0 comments on commit 28ead06

Please sign in to comment.