Skip to content

Commit

Permalink
test processor and only break out of processor when shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Dec 22, 2023
1 parent 157dbef commit 98d92bf
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 7 deletions.
1 change: 1 addition & 0 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
- name: Remove examples dir
run: |
rm -rf examples
rm -rf src/mongodb
- name: Run the tests
run: yarn test:ci
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<img src="https://codecov.io/gh/dillonstreator/txob/graph/badge.svg?token=E9M7G67VLL"/>
</a>
<a aria-label="NPM version" href="https://www.npmjs.com/package/txob">
<img alt="" src="https://badgen.net/npm/v/txob?t=1702855065">
<img alt="" src="https://badgen.net/npm/v/txob">
</a>
<a aria-label="License" href="https://github.com/dillonstreator/txob/blob/main/LICENSE">
<img alt="" src="https://badgen.net/npm/license/txob">
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.14",
"version": "0.0.15",
"license": "MIT",
"files": [
"dist",
Expand Down
41 changes: 40 additions & 1 deletion src/processor.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { describe, it, expect, vi, afterEach } from "vitest";
import { TxOBEvent, processEvents } from "./processor";
import { Processor, TxOBEvent, processEvents } from "./processor";
import { sleep } from "./sleep";

const mockTxClient = {
getEventByIdForUpdateSkipLocked: vi.fn(),
Expand Down Expand Up @@ -245,3 +246,41 @@ describe("processEvents", () => {
});
});
});

describe("Processor", () => {
it("should shutdown gracefully", async () => {
let calls = 0;
let aborted = false;
const processor = Processor(
({ signal }) => {
calls++;
return new Promise((r) => {
signal.addEventListener("abort", () => {
aborted = true;
r();
});
});
},
{ sleepTimeMs: 0 },
);
processor.start();

await processor.stop();

expect(calls).toBe(1);
expect(aborted).toBe(true);
});
it("should respect shutdown timeout and throw", async () => {
const processor = Processor(() => sleep(100), { sleepTimeMs: 0 });
processor.start();

const start = Date.now();
try {
await processor.stop({ timeoutMs: 10});
} catch (error) {
expect(error.message).toBe('shutdown timeout 10ms elapsed')
}
const diff = Date.now() - start;
expect(diff).toBeLessThan(50);
});
});
6 changes: 2 additions & 4 deletions src/processor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { retryable, RetryOpts } from "./retry";
import { getDate } from "./date";
import EventEmitter from "node:events";
import { sleep } from "./sleep";

type TxOBEventHandlerResult = {
processed_at?: Date;
Expand Down Expand Up @@ -256,8 +257,6 @@ class SignalAbortedError extends Error {
}
}

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));

export const Processor = (
fn: ({ signal }: { signal: AbortSignal }) => Promise<void>,
opts?: { sleepTimeMs?: number; logger?: Logger },
Expand Down Expand Up @@ -299,11 +298,10 @@ export const Processor = (
} catch (error) {
if (error instanceof SignalAbortedError) {
opts?.logger?.debug(error.message);
break;
} else {
opts?.logger?.error(error);
}

break;
} finally {
if (abortListener)
ac.signal.removeEventListener("abort", abortListener);
Expand Down
2 changes: 2 additions & 0 deletions src/sleep.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const sleep = (ms: number) =>
new Promise<void>((r) => setTimeout(r, ms));

0 comments on commit 98d92bf

Please sign in to comment.