diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml
index acc05e9..2e5fd1b 100644
--- a/.github/workflows/coverage.yml
+++ b/.github/workflows/coverage.yml
@@ -25,6 +25,7 @@ jobs:
- name: Remove examples dir
run: |
rm -rf examples
+ rm -rf src/mongodb
- name: Run the tests
run: yarn test:ci
diff --git a/README.md b/README.md
index aaad800..a8519de 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
-
+
diff --git a/package.json b/package.json
index f56d147..25d6f63 100644
--- a/package.json
+++ b/package.json
@@ -13,7 +13,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
- "version": "0.0.14",
+ "version": "0.0.15",
"license": "MIT",
"files": [
"dist",
diff --git a/src/processor.test.ts b/src/processor.test.ts
index ff52f58..6bf87a8 100644
--- a/src/processor.test.ts
+++ b/src/processor.test.ts
@@ -1,5 +1,6 @@
import { describe, it, expect, vi, afterEach } from "vitest";
-import { TxOBEvent, processEvents } from "./processor";
+import { Processor, TxOBEvent, processEvents } from "./processor";
+import { sleep } from "./sleep";
const mockTxClient = {
getEventByIdForUpdateSkipLocked: vi.fn(),
@@ -245,3 +246,41 @@ describe("processEvents", () => {
});
});
});
+
+describe("Processor", () => {
+ it("should shutdown gracefully", async () => {
+ let calls = 0;
+ let aborted = false;
+ const processor = Processor(
+ ({ signal }) => {
+ calls++;
+ return new Promise((r) => {
+ signal.addEventListener("abort", () => {
+ aborted = true;
+ r();
+ });
+ });
+ },
+ { sleepTimeMs: 0 },
+ );
+ processor.start();
+
+ await processor.stop();
+
+ expect(calls).toBe(1);
+ expect(aborted).toBe(true);
+ });
+ it("should respect shutdown timeout and throw", async () => {
+ const processor = Processor(() => sleep(100), { sleepTimeMs: 0 });
+ processor.start();
+
+ const start = Date.now();
+ try {
+ await processor.stop({ timeoutMs: 10});
+ } catch (error) {
+ expect(error.message).toBe('shutdown timeout 10ms elapsed')
+ }
+ const diff = Date.now() - start;
+ expect(diff).toBeLessThan(50);
+ });
+});
diff --git a/src/processor.ts b/src/processor.ts
index 0840a8d..e34d44b 100644
--- a/src/processor.ts
+++ b/src/processor.ts
@@ -1,6 +1,7 @@
import { retryable, RetryOpts } from "./retry";
import { getDate } from "./date";
import EventEmitter from "node:events";
+import { sleep } from "./sleep";
type TxOBEventHandlerResult = {
processed_at?: Date;
@@ -256,8 +257,6 @@ class SignalAbortedError extends Error {
}
}
-const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
-
export const Processor = (
fn: ({ signal }: { signal: AbortSignal }) => Promise,
opts?: { sleepTimeMs?: number; logger?: Logger },
@@ -299,11 +298,10 @@ export const Processor = (
} catch (error) {
if (error instanceof SignalAbortedError) {
opts?.logger?.debug(error.message);
+ break;
} else {
opts?.logger?.error(error);
}
-
- break;
} finally {
if (abortListener)
ac.signal.removeEventListener("abort", abortListener);
diff --git a/src/sleep.ts b/src/sleep.ts
new file mode 100644
index 0000000..a566479
--- /dev/null
+++ b/src/sleep.ts
@@ -0,0 +1,2 @@
+export const sleep = (ms: number) =>
+ new Promise((r) => setTimeout(r, ms));