Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
keroxp authored May 2, 2020
1 parent 4468929 commit 85ad630
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 203 deletions.
2 changes: 1 addition & 1 deletion .denov
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.41.0
v0.42.0
20 changes: 11 additions & 9 deletions command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,17 @@ export type RedisCommands = {
>;
command_count(): Promise<Integer>;
command_getkeys(): Promise<BulkString[]>;
command_info(...command_names: string[]): Promise<[[
BulkString,
Integer,
BulkString[],
Integer,
Integer,
Integer,
[BulkString[]],
] | BulkNil]>;
command_info(...command_names: string[]): Promise<[
[
BulkString,
Integer,
BulkString[],
Integer,
Integer,
Integer,
[BulkString[]],
] | BulkNil,
]>;
config_get(parameter: string): Promise<BulkString[]>;
config_rewrite(): Promise<Status>;
config_set(parameter: string, value: string): Promise<Status>;
Expand Down
20 changes: 11 additions & 9 deletions io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {
BufReader,
BufWriter,
} from "./vendor/https/deno.land/std/io/bufio.ts";
import Buffer = Deno.Buffer;
import { ErrorReplyError } from "./errors.ts";
import {
deferred,
Expand Down Expand Up @@ -65,8 +64,8 @@ export async function sendCommand(

export async function readReply(reader: BufReader): Promise<RedisRawReply> {
const res = await reader.peek(1);
if (res === Deno.EOF) {
throw Deno.EOF;
if (res === null) {
throw new Error("EOF");
}
switch (res[0]) {
case IntegerReplyCode:
Expand All @@ -83,17 +82,18 @@ export async function readReply(reader: BufReader): Promise<RedisRawReply> {
throw new Error("Invalid state");
}

const decoder = new TextDecoder();
export async function readLine(reader: BufReader): Promise<string> {
let buf = new Uint8Array(1024);
let loc = 0;
let d: number | Deno.EOF;
while ((d = await reader.readByte()) && d !== Deno.EOF) {
let d: number | null = null;
while ((d = await reader.readByte()) && d !== null) {
if (d === "\r".charCodeAt(0)) {
const d1 = await reader.readByte();
if (d1 === "\n".charCodeAt(0)) {
buf[loc++] = d;
buf[loc++] = d1;
return new Buffer(buf.subarray(0, loc)).toString();
return decoder.decode(new Deno.Buffer(buf.subarray(0, loc)).bytes());
}
}
buf[loc++] = d;
Expand Down Expand Up @@ -131,7 +131,9 @@ export async function readBulkReply(reader: BufReader): Promise<Bulk> {
}
const dest = new Uint8Array(size + 2);
await reader.readFull(dest);
return new Buffer(dest.subarray(0, dest.length - 2)).toString();
return decoder.decode(
new Deno.Buffer(dest.subarray(0, dest.length - 2)).bytes(),
);
}

export async function readArrayReply(reader: BufReader): Promise<any[]> {
Expand All @@ -140,8 +142,8 @@ export async function readArrayReply(reader: BufReader): Promise<any[]> {
const result: any[] = [];
for (let i = 0; i < argCount; i++) {
const res = await reader.peek(1);
if (res === Deno.EOF) {
throw Deno.EOF;
if (res === null) {
throw new Error("EOF");
}
switch (res[0]) {
case SimpleStringCode:
Expand Down
2 changes: 1 addition & 1 deletion modules-lock.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"https://deno.land/std": {
"version": "@v0.41.0",
"version": "@v0.42.0",
"modules": [
"/util/async.ts",
"/testing/asserts.ts",
Expand Down
2 changes: 1 addition & 1 deletion modules.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"https://deno.land/std": {
"version": "@v0.41.0",
"version": "@v0.42.0",
"modules": [
"/util/async.ts",
"/testing/asserts.ts",
Expand Down
4 changes: 1 addition & 3 deletions pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import {
createRequest,
readReply,
RedisRawReply,
StatusReply,
CommandFunc,
CommandExecutor,
} from "./io.ts";
import { ErrorReplyError } from "./errors.ts";
Expand Down Expand Up @@ -100,7 +98,7 @@ export function createRedisPipeline(
return Object.assign(fakeRedis, executor, { enqueue, flush });
}

function dummyReadWriteCloser(): Deno.ReadWriteCloser {
function dummyReadWriteCloser(): Deno.Reader & Deno.Writer & Deno.Closer {
return {
close() {},
async read(p) {
Expand Down
205 changes: 108 additions & 97 deletions pipeline_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,105 +6,116 @@ const addr = {
port: 6379,
};

test(async function testPipeline() {
const redis = await connect(addr);
const pl = redis.pipeline();
await Promise.all([
pl.ping(),
pl.ping(),
pl.set("set1", "value1"),
pl.set("set2", "value2"),
pl.mget("set1", "set2"),
pl.del("set1"),
pl.del("set2"),
]);
const ret = await pl.flush();
assertEquals(ret, [
["status", "PONG"],
["status", "PONG"],
["status", "OK"],
["status", "OK"],
["array", ["value1", "value2"]],
["integer", 1],
["integer", 1],
]);
redis.close();
test({
name: "testPipeline",
fn: async function testPipeline() {
const redis = await connect(addr);
const pl = redis.pipeline();
await Promise.all([
pl.ping(),
pl.ping(),
pl.set("set1", "value1"),
pl.set("set2", "value2"),
pl.mget("set1", "set2"),
pl.del("set1"),
pl.del("set2"),
]);
const ret = await pl.flush();
assertEquals(ret, [
["status", "PONG"],
["status", "PONG"],
["status", "OK"],
["status", "OK"],
["array", ["value1", "value2"]],
["integer", 1],
["integer", 1],
]);
redis.close();
},
});

test(async function testTx() {
const redis = await connect(addr);
const tx1 = redis.tx();
const tx2 = redis.tx();
const tx3 = redis.tx();
await redis.del("key");
await Promise.all<any>([
tx1.get("key"),
tx1.incr("key"),
tx1.incr("key"),
tx1.incr("key"),
tx1.get("key"),
//
tx2.get("key"),
tx2.incr("key"),
tx2.incr("key"),
tx2.incr("key"),
tx2.get("key"),
//
tx3.get("key"),
tx3.incr("key"),
tx3.incr("key"),
tx3.incr("key"),
tx3.get("key"),
]);
const rep1 = await tx1.flush();
const rep2 = await tx2.flush();
const rep3 = await tx3.flush();
assertEquals(
parseInt(rep1[4][1] as string),
parseInt(rep1[0][1] as string) + 3,
);
assertEquals(
parseInt(rep2[4][1] as string),
parseInt(rep2[0][1] as string) + 3,
);
assertEquals(
parseInt(rep3[4][1] as string),
parseInt(rep3[0][1] as string) + 3,
);
redis.close();
test({
name: "testTx",
fn: async function testTx() {
const redis = await connect(addr);
const tx1 = redis.tx();
const tx2 = redis.tx();
const tx3 = redis.tx();
await redis.del("key");
await Promise.all<any>([
tx1.get("key"),
tx1.incr("key"),
tx1.incr("key"),
tx1.incr("key"),
tx1.get("key"),
//
tx2.get("key"),
tx2.incr("key"),
tx2.incr("key"),
tx2.incr("key"),
tx2.get("key"),
//
tx3.get("key"),
tx3.incr("key"),
tx3.incr("key"),
tx3.incr("key"),
tx3.get("key"),
]);
const rep1 = await tx1.flush();
const rep2 = await tx2.flush();
const rep3 = await tx3.flush();
assertEquals(
parseInt(rep1[4][1] as string),
parseInt(rep1[0][1] as string) + 3,
);
assertEquals(
parseInt(rep2[4][1] as string),
parseInt(rep2[0][1] as string) + 3,
);
assertEquals(
parseInt(rep3[4][1] as string),
parseInt(rep3[0][1] as string) + 3,
);
redis.close();
},
});

test("pipeline in concurrent", async () => {
const redis = await connect(addr);
const tx = redis.pipeline();
let promises: Promise<any>[] = [];
await redis.del("a", "b", "c");
for (const key of ["a", "b", "c"]) {
promises.push(tx.set(key, key));
}
promises.push(tx.flush());
for (const key of ["a", "b", "c"]) {
promises.push(tx.get(key));
}
promises.push(tx.flush());
const res = await Promise.all(promises);
assertEquals(res, [
"OK", // set(a)
"OK", // set(b)
"OK", // set(c)
[
["status", "OK"],
["status", "OK"],
["status", "OK"],
], // flush()
"OK", // get(a)
"OK", // get(b)
"OK", // get(c)
[
["bulk", "a"],
["bulk", "b"],
["bulk", "c"],
], // flush()
]);
redis.close();
test({
name: "pipeline in concurrent",
async fn() {
{
const redis = await connect(addr);
const tx = redis.pipeline();
let promises: Promise<any>[] = [];
await redis.del("a", "b", "c");
for (const key of ["a", "b", "c"]) {
promises.push(tx.set(key, key));
}
promises.push(tx.flush());
for (const key of ["a", "b", "c"]) {
promises.push(tx.get(key));
}
promises.push(tx.flush());
const res = await Promise.all(promises);
assertEquals(res, [
"OK", // set(a)
"OK", // set(b)
"OK", // set(c)
[
["status", "OK"],
["status", "OK"],
["status", "OK"],
], // flush()
"OK", // get(a)
"OK", // get(b)
"OK", // get(c)
[
["bulk", "a"],
["bulk", "b"],
["bulk", "c"],
], // flush()
]);
redis.close();
}
},
});
Loading

0 comments on commit 85ad630

Please sign in to comment.