From 98d92bf56fc976ce565446542c6f63c0d826945d Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Thu, 21 Dec 2023 21:37:20 -0600 Subject: [PATCH] test processor and only break out of processor when shut down --- .github/workflows/coverage.yml | 1 + README.md | 2 +- package.json | 2 +- src/processor.test.ts | 41 +++++++++++++++++++++++++++++++++- src/processor.ts | 6 ++--- src/sleep.ts | 2 ++ 6 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 src/sleep.ts diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index acc05e9..2e5fd1b 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -25,6 +25,7 @@ jobs: - name: Remove examples dir run: | rm -rf examples + rm -rf src/mongodb - name: Run the tests run: yarn test:ci diff --git a/README.md b/README.md index aaad800..a8519de 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ - + diff --git a/package.json b/package.json index f56d147..25d6f63 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.14", + "version": "0.0.15", "license": "MIT", "files": [ "dist", diff --git a/src/processor.test.ts b/src/processor.test.ts index ff52f58..6bf87a8 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect, vi, afterEach } from "vitest"; -import { TxOBEvent, processEvents } from "./processor"; +import { Processor, TxOBEvent, processEvents } from "./processor"; +import { sleep } from "./sleep"; const mockTxClient = { getEventByIdForUpdateSkipLocked: vi.fn(), @@ -245,3 +246,41 @@ describe("processEvents", () => { }); }); }); + +describe("Processor", () => { + it("should shutdown gracefully", async () => { + let calls = 0; + let aborted = false; + const processor = Processor( + ({ signal }) => { + calls++; + return new Promise((r) => { + signal.addEventListener("abort", () => { + aborted = true; + r(); + }); + }); + }, + { sleepTimeMs: 0 }, + ); + processor.start(); + + await processor.stop(); + + expect(calls).toBe(1); + expect(aborted).toBe(true); + }); + it("should respect shutdown timeout and throw", async () => { + const processor = Processor(() => sleep(100), { sleepTimeMs: 0 }); + processor.start(); + + const start = Date.now(); + try { + await processor.stop({ timeoutMs: 10}); + } catch (error) { + expect(error.message).toBe('shutdown timeout 10ms elapsed') + } + const diff = Date.now() - start; + expect(diff).toBeLessThan(50); + }); +}); diff --git a/src/processor.ts b/src/processor.ts index 0840a8d..e34d44b 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -1,6 +1,7 @@ import { retryable, RetryOpts } from "./retry"; import { getDate } from "./date"; import EventEmitter from "node:events"; +import { sleep } from "./sleep"; type TxOBEventHandlerResult = { processed_at?: Date; @@ -256,8 +257,6 @@ class SignalAbortedError extends Error { } } -const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); - export const Processor = ( fn: ({ signal }: { signal: AbortSignal }) => Promise, opts?: { sleepTimeMs?: number; logger?: Logger }, @@ -299,11 +298,10 @@ export const Processor = ( } catch (error) { if (error instanceof SignalAbortedError) { opts?.logger?.debug(error.message); + break; } else { opts?.logger?.error(error); } - - break; } finally { if (abortListener) ac.signal.removeEventListener("abort", abortListener); diff --git a/src/sleep.ts b/src/sleep.ts new file mode 100644 index 0000000..a566479 --- /dev/null +++ b/src/sleep.ts @@ -0,0 +1,2 @@ +export const sleep = (ms: number) => + new Promise((r) => setTimeout(r, ms));