Skip to content

Commit

Permalink
Add unit test for async generator
Browse files Browse the repository at this point in the history
  • Loading branch information
mustard-mh committed Nov 8, 2023
1 parent 0ef6e8e commit 10f8b46
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 47 deletions.
1 change: 1 addition & 0 deletions components/gitpod-protocol/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"analytics-node": "^6.0.0",
"configcat-node": "^8.0.0",
"cookie": "^0.4.2",
"event-iterator": "^2.0.0",
"express": "^4.17.3",
"google-protobuf": "^3.19.1",
"inversify": "^6.0.1",
Expand Down
128 changes: 128 additions & 0 deletions components/gitpod-protocol/src/generate-async-generator.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/**
* Copyright (c) 2023 Gitpod GmbH. All rights reserved.
* Licensed under the GNU Affero General Public License (AGPL).
* See License.AGPL.txt in the project root for license information.
*/

import { suite, test } from "@testdeck/mocha";
import * as chai from "chai";

import { generateAsyncGenerator } from "./generate-async-generator";
import { Disposable } from "./util/disposable";

const expect = chai.expect;

function watchWith(times: number, listener: (value: number) => void): Disposable {
let i = 0;
const cancel = setInterval(() => {
if (i < times) {
listener(i++);
}
}, 100);
return {
dispose: () => {
console.log("clean");
clearInterval(cancel);
},
};
}

const error = new Error("Test error");
function watchIterator(
resultRef: { isDisposed: boolean; result: number[] },
option: { errorAfter?: number; times: number; abortAfterMs?: number; setupError?: boolean },
) {
const abortController = new AbortController();
setTimeout(() => {
abortController.abort();
}, option.abortAfterMs ?? 600);
return generateAsyncGenerator<number>(
(sink) => {
try {
if (option.setupError) {
throw error;
}
const dispose = watchWith(option.times, (v) => {
if (option.errorAfter && option.errorAfter === v) {
sink.fail(error);
return;
}
sink.push(v);
});
return () => {
resultRef.isDisposed = true;
dispose.dispose();
};
} catch (e) {
sink.fail(e as any as Error);
}
},
{ signal: abortController.signal },
);
}

@suite
class TestGenerateAsyncGenerator {
@test public async "happy path"() {
const ref: { isDisposed: boolean; result: number[] } = { isDisposed: false, result: [] };
const it = watchIterator(ref, { times: 5 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
expect(e.message).to.be.equal("Abort error");
expect(ref.result.length).to.be.equal(5);
expect(ref.isDisposed).to.be.equal(true);
}
}

@test public async "should be stopped after abort signal is triggered"() {
const ref: { isDisposed: boolean; result: number[] } = { isDisposed: false, result: [] };
const it = watchIterator(ref, { times: 5, abortAfterMs: 120 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
expect(e.message).to.be.equal("Abort error");
expect(ref.result[0]).to.be.equal(0);
expect(ref.result.length).to.be.equal(1);
expect(ref.isDisposed).to.be.equal(true);
}
}

@test public async "should throw error if setup throws"() {
const ref: { isDisposed: boolean; result: number[] } = { isDisposed: false, result: [] };
const it = watchIterator(ref, { times: 5, setupError: true });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
expect(e).to.be.equal(error);
expect(ref.result.length).to.be.equal(0);
expect(ref.isDisposed).to.be.equal(false);
}
}

@test public async "should propagate errors from sink.next"() {
const ref: { isDisposed: boolean; result: number[] } = { isDisposed: false, result: [] };
const it = watchIterator(ref, { times: 5, errorAfter: 2 });
try {
for await (const v of it) {
ref.result.push(v);
}
expect.fail("should throw error");
} catch (e) {
expect(e).to.be.equal(error);
expect(ref.result.length).to.be.equal(2);
expect(ref.isDisposed).to.be.equal(true);
}
}
}

module.exports = new TestGenerateAsyncGenerator(); // Only to circumvent no usage warning :-/
66 changes: 20 additions & 46 deletions components/gitpod-protocol/src/generate-async-generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,58 +4,32 @@
* See License.AGPL.txt in the project root for license information.
*/

type Sink<T> = {
next: (value: T | void) => void;
};
import { EventIterator } from "event-iterator";
import { Queue } from "event-iterator/lib/event-iterator";

/**
* Generates an asynchronous generator that yields values based on the provided setup function.
*
* the setup function that takes a sink and returns a cleanup function.
* setup sink object has a `next` method that accepts a value to be pushed to the generator.
* the setup function that takes a queue and returns a cleanup function.
* `queue.next` method that accepts a value to be pushed to the generator.
*
* remember that setup callback MUST wrap with try catch and use `queue.fail` to propagate error
*
* Iterator will always at least end with throw an `Abort error`
*/
export async function* generateAsyncGenerator<T>(
setup: (sink: Sink<T>) => () => void,
export function generateAsyncGenerator<T>(
setup: (queue: Queue<T>) => (() => void) | void,
opts: { signal: AbortSignal },
): AsyncGenerator<T | void, void, unknown> {
const queue: T[] = [];

let resolveNext: ((value: T | void) => void) | null = null;

const sink: Sink<T> = {
next: (value: T | void) => {
if (resolveNext) {
resolveNext(value);
resolveNext = null;
} else {
if (value) {
queue.push(value);
}
) {
return new EventIterator<T>((queue) => {
opts.signal.addEventListener("abort", () => {
queue.fail(new Error("Abort error"));
});
const dispose = setup(queue);
return () => {
if (dispose) {
dispose();
}
},
};

let isStopped = false;
opts.signal.addEventListener("abort", () => {
isStopped = true;
sink.next();
};
});

const cleanup = setup(sink);

try {
while (!isStopped) {
if (queue.length) {
yield queue.shift();
} else {
yield new Promise<T | void>((resolve) => {
resolveNext = resolve;
});
}
}
// ignore error since code in `try` scope will not throw an error
// unless caller use it.throw, then it will throw to itself
} finally {
cleanup();
}
}
12 changes: 11 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3695,7 +3695,7 @@
"@types/node" "*"
form-data "^3.0.0"

"@types/node@*", "@types/node@>=12.12.47", "@types/node@>=13.7.0", "@types/node@^16.11.0", "@types/node@^16.11.6":
"@types/node@*", "@types/node@>=12.12.47", "@types/node@>=13.7.0", "@types/node@^16.11.0":
version "16.11.6"
resolved "https://registry.npmjs.org/@types/node/-/node-16.11.6.tgz"
integrity sha512-ua7PgUoeQFjmWPcoo9khiPum3Pd60k4/2ZGXt18sm2Slk0W0xZTqt5Y0Ny1NyBiN1EVQ/+FaF9NcY4Qe6rwk5w==
Expand All @@ -3710,6 +3710,11 @@
resolved "https://registry.npmjs.org/@types/node/-/node-14.17.32.tgz"
integrity sha512-JcII3D5/OapPGx+eJ+Ik1SQGyt6WvuqdRfh9jUwL6/iHGjmyOriBDciBUu7lEIBTL2ijxwrR70WUnw5AEDmFvQ==

"@types/node@^16.11.6":
version "16.18.61"
resolved "https://registry.yarnpkg.com/@types/node/-/node-16.18.61.tgz#5ea47e3018348bf3bbbe646b396ba5e720310be1"
integrity sha512-k0N7BqGhJoJzdh6MuQg1V1ragJiXTh8VUBAZTWjJ9cUq23SG0F0xavOwZbhiP4J3y20xd6jxKx+xNUhkMAi76Q==

"@types/node@^18.18.8":
version "18.18.8"
resolved "https://registry.yarnpkg.com/@types/node/-/node-18.18.8.tgz#2b285361f2357c8c8578ec86b5d097c7f464cfd6"
Expand Down Expand Up @@ -7425,6 +7430,11 @@ etag@~1.8.1:
resolved "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz"
integrity sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=

event-iterator@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/event-iterator/-/event-iterator-2.0.0.tgz#10f06740cc1e9fd6bc575f334c2bc1ae9d2dbf62"
integrity sha512-KGft0ldl31BZVV//jj+IAIGCxkvvUkkON+ScH6zfoX+l+omX6001ggyRSpI0Io2Hlro0ThXotswCtfzS8UkIiQ==

event-target-shim@^5.0.0:
version "5.0.1"
resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789"
Expand Down

0 comments on commit 10f8b46

Please sign in to comment.