diff --git a/src/processor.test.ts b/src/processor.test.ts index 6bf87a8..784405a 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -1,5 +1,10 @@ import { describe, it, expect, vi, afterEach } from "vitest"; -import { Processor, TxOBEvent, processEvents } from "./processor"; +import { + Processor, + TxOBEvent, + defaultBackoff, + processEvents, +} from "./processor"; import { sleep } from "./sleep"; const mockTxClient = { @@ -95,7 +100,18 @@ describe("processEvents", () => { handler_results: {}, errors: 0, }; - const events = [evt1, evt2, evt3]; + // skip processed + const evt4: TxOBEvent = { + type: "evtType1", + id: "4", + timestamp: now, + data: {}, + correlation_id: "xyz123", + handler_results: {}, + errors: 0, + processed_at: now, + }; + const events = [evt1, evt2, evt3, evt4]; mockClient.getUnprocessedEvents.mockImplementation(() => events); mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => { if (id === evt3.id) return null; @@ -114,7 +130,7 @@ describe("processEvents", () => { expect(mockClient.getUnprocessedEvents).toHaveBeenCalledOnce(); expect(mockClient.getUnprocessedEvents).toHaveBeenCalledWith(opts); - expect(mockClient.transaction).toHaveBeenCalledTimes(2); + expect(mockClient.transaction).toHaveBeenCalledTimes(3); expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce(); expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, { @@ -127,7 +143,7 @@ describe("processEvents", () => { expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled(); expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( - 2, + 3, ); expect(opts.backoff).toHaveBeenCalledOnce(); @@ -247,6 +263,17 @@ describe("processEvents", () => { }); }); +describe("defaultBackoff", () => { + it("should calculate a backoff", () => { + const backoff = defaultBackoff(3); + const actual = backoff.getTime(); + const expected = Date.now() + 1000 * 2 ** 3; + const diff = Math.abs(actual - expected); + + expect(diff).lessThanOrEqual(1); + }); +}); + describe("Processor", () => { it("should shutdown gracefully", async () => { let calls = 0; @@ -276,9 +303,9 @@ describe("Processor", () => { const start = Date.now(); try { - await processor.stop({ timeoutMs: 10}); + await processor.stop({ timeoutMs: 10 }); } catch (error) { - expect(error.message).toBe('shutdown timeout 10ms elapsed') + expect(error.message).toBe("shutdown timeout 10ms elapsed"); } const diff = Date.now() - start; expect(diff).toBeLessThan(50); diff --git a/src/processor.ts b/src/processor.ts index e34d44b..bd996df 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -59,7 +59,7 @@ export interface TxOBTransactionProcessorClient { updateEvent(event: TxOBEvent): Promise; } -const defaultBackoff = (errorCount: number): Date => { +export const defaultBackoff = (errorCount: number): Date => { const baseDelayMs = 1000; const maxDelayMs = 1000 * 60; const backoffMs = Math.min(baseDelayMs * 2 ** errorCount, maxDelayMs); diff --git a/src/sleep.ts b/src/sleep.ts index a566479..f4aec8d 100644 --- a/src/sleep.ts +++ b/src/sleep.ts @@ -1,2 +1,2 @@ -export const sleep = (ms: number) => - new Promise((r) => setTimeout(r, ms)); +export const sleep = (ms: number) => + new Promise((r) => setTimeout(r, ms));