From 28ead060aa467c1558b82b253c0f2f3f9bf10f40 Mon Sep 17 00:00:00 2001 From: yahiro <321700+yahiro07@users.noreply.github.com> Date: Sat, 16 Mar 2024 16:22:37 +0900 Subject: [PATCH] fix: fix concurrent pipeline invocation results (#429) Co-authored-by: yahiro --- connection.ts | 58 +++++++++++++++++++------------------- tests/commands/pipeline.ts | 31 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/connection.ts b/connection.ts index 633f26b1..615a91a3 100644 --- a/connection.ts +++ b/connection.ts @@ -1,6 +1,3 @@ -import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts"; -import type { RedisReply, RedisValue } from "./protocol/shared/types.ts"; -import type { Command, Protocol } from "./protocol/shared/protocol.ts"; import type { Backoff } from "./backoff.ts"; import { exponentialBackoff } from "./backoff.ts"; import { ErrorReplyError, isRetriableError } from "./errors.ts"; @@ -10,6 +7,9 @@ import { kUnstableReadReply, kUnstableWriteCommand, } from "./internal/symbols.ts"; +import { Protocol as DenoStreamsProtocol } from "./protocol/deno_streams/mod.ts"; +import type { Command, Protocol } from "./protocol/shared/protocol.ts"; +import type { RedisReply, RedisValue } from "./protocol/shared/types.ts"; import { delay } from "./vendor/https/deno.land/std/async/delay.ts"; export interface SendCommandOptions { @@ -73,11 +73,9 @@ export interface RedisConnectionOptions { export const kEmptyRedisArgs: Array = []; interface PendingCommand { - name: string; - args: RedisValue[]; + execute: () => Promise; resolve: (reply: RedisReply) => void; reject: (error: unknown) => void; - returnUint8Arrays?: boolean; } export class RedisConnection implements Connection { @@ -148,22 +146,29 @@ export class RedisConnection implements Connection { await this.sendCommand("SELECT", [db]); } + private enqueueCommand( + command: PendingCommand, + ) { + this.commandQueue.push(command); + if (this.commandQueue.length === 1) { + this.processCommandQueue(); + } + } + sendCommand( command: string, args?: Array, options?: SendCommandOptions, ): Promise { const { promise, resolve, reject } = Promise.withResolvers(); - this.commandQueue.push({ - name: command, - args: args ?? kEmptyRedisArgs, - resolve, - reject, - returnUint8Arrays: options?.returnUint8Arrays, - }); - if (this.commandQueue.length === 1) { - this.processCommandQueue(); - } + const execute = () => + this.#protocol.sendCommand( + command, + args ?? kEmptyRedisArgs, + options?.returnUint8Arrays, + ); + this.enqueueCommand({ execute, resolve, reject }); + return promise; } @@ -172,7 +177,12 @@ export class RedisConnection implements Connection { } [kUnstablePipeline](commands: Array) { - return this.#protocol.pipeline(commands); + const { promise, resolve, reject } = Promise.withResolvers< + RedisReply[] + >(); + const execute = () => this.#protocol.pipeline(commands); + this.enqueueCommand({ execute, resolve, reject } as PendingCommand); + return promise; } [kUnstableWriteCommand](command: Command): Promise { @@ -256,11 +266,7 @@ export class RedisConnection implements Connection { if (!command) return; try { - const reply = await this.#protocol.sendCommand( - command.name, - command.args, - command.returnUint8Arrays, - ); + const reply = await command.execute(); command.resolve(reply); } catch (error) { if ( @@ -275,13 +281,7 @@ export class RedisConnection implements Connection { this.close(); try { await this.connect(); - - const reply = await this.#protocol.sendCommand( - command.name, - command.args, - command.returnUint8Arrays, - ); - + const reply = await command.execute(); return command.resolve(reply); } catch { // TODO: use `AggregateError`? const backoff = this.backoff(i); diff --git a/tests/commands/pipeline.ts b/tests/commands/pipeline.ts index 994a6545..8b459b7c 100644 --- a/tests/commands/pipeline.ts +++ b/tests/commands/pipeline.ts @@ -117,6 +117,37 @@ export function pipelineTests( } }); + it("pipeline in concurrent, avoid redundant response mixup", async () => { + { + const opts = getOpts(); + const client = await connect(opts); + + const randomValues = new Array(10) + .fill(0) + .map(() => new Array(10).fill(0).map(() => Math.random().toString())); + + for (let i = 0; i < 10; i++) { + const key = `list_${i}`; + const values = randomValues[i]; + await client.del(key); + await client.rpush(key, ...values); + } + + // deno-lint-ignore no-inner-declarations + async function task() { + const tx = client.pipeline(); + for (let i = 0; i < 10; i++) { + tx.lrange(`list_${i}`, 0, -1); + } + return await tx.flush(); + } + + const res = await Promise.all([task(), task()]); + assertEquals(res, [randomValues, randomValues]); + client.close(); + } + }); + it("error while pipeline", async () => { const opts = getOpts(); const client = await connect(opts);