From d643d4ec1123e321c106220f97d2a9e90cb60a7a Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Wed, 24 Apr 2024 16:24:35 -0500 Subject: [PATCH] feat: add a sentinel error to signal the processor to stop attempting to re-execute the handler on subsequent ticks --- examples/pg/processor.ts | 7 ++- src/processor.test.ts | 99 ++++++++++++++++++++++++++++++++++++++++ src/processor.ts | 49 +++++++++++++++++--- 3 files changed, 147 insertions(+), 8 deletions(-) diff --git a/examples/pg/processor.ts b/examples/pg/processor.ts index bb4ec4f..762fa35 100644 --- a/examples/pg/processor.ts +++ b/examples/pg/processor.ts @@ -1,5 +1,8 @@ import { Client } from "pg"; -import { EventProcessor } from "../../src/processor"; +import { + ErrorUnprocessableEventHandler, + EventProcessor, +} from "../../src/processor"; import { createProcessorClient } from "../../src/pg/client"; import { migrate, type EventType } from "./index"; import dotenv from "dotenv"; @@ -29,6 +32,8 @@ let processor: ReturnType | undefined = undefined; thing2: async (event) => { console.log(`${event.id} thing2 ${event.correlation_id}`); if (Math.random() > 0.9) throw new Error("some issue"); + if (Math.random() > 0.6) + throw new ErrorUnprocessableEventHandler(new Error("parent error")); return; }, diff --git a/src/processor.test.ts b/src/processor.test.ts index 3c26f1d..f122c45 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -3,6 +3,7 @@ import { EventProcessor, Processor, TxOBEvent, + ErrorUnprocessableEventHandler, defaultBackoff, processEvents, } from "./processor"; @@ -262,6 +263,104 @@ describe("processEvents", () => { processed_at: now, }); }); + + it("respects ErrorUnprocessableEventHandler sentinel error to stop handler processing", async () => { + const opts = { + maxErrors: 5, + backoff: vi.fn(), + }; + const err = new Error("some error"); + const errUnprocessable = new ErrorUnprocessableEventHandler( + new Error("err1"), + ); + const handlerMap = { + evtType1: { + handler1: vi.fn(() => Promise.reject(errUnprocessable)), + handler2: vi.fn(() => Promise.reject(errUnprocessable)), + }, + }; + const evt1: TxOBEvent = { + type: "evtType1", + id: "1", + timestamp: now, + data: {}, + correlation_id: "abc123", + handler_results: { + handler1: { + errors: [{ error: err.message, timestamp: now }], + }, + handler2: { + processed_at: now, + }, + handler3: { + unprocessable_at: now, + errors: [ + { + error: errUnprocessable.message, + timestamp: now, + }, + ], + }, + }, + errors: 1, + }; + const events = [evt1]; + mockClient.getEventsToProcess.mockImplementation(() => events); + mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => { + return events.find((e) => e.id === id); + }); + mockTxClient.updateEvent.mockImplementation(() => { + return Promise.resolve(); + }); + + await processEvents(mockClient, handlerMap, opts); + + expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce(); + expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts); + + expect(mockClient.transaction).toHaveBeenCalledTimes(1); + + expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce(); + expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, { + signal: undefined, + }); + expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + 1, + ); + + expect(mockTxClient.updateEvent).toHaveBeenCalledTimes(1); + expect(mockTxClient.updateEvent).toHaveBeenCalledWith({ + backoff_until: null, + correlation_id: "abc123", + data: {}, + errors: 1, + handler_results: { + handler1: { + unprocessable_at: now, + errors: [ + { error: err.message, timestamp: now }, + { error: errUnprocessable.message, timestamp: now }, + ], + }, + handler2: { + processed_at: now, + }, + handler3: { + unprocessable_at: now, + errors: [ + { + error: errUnprocessable.message, + timestamp: now, + }, + ], + }, + }, + id: "1", + timestamp: now, + type: "evtType1", + processed_at: now, + }); + }); }); describe("defaultBackoff", () => { diff --git a/src/processor.ts b/src/processor.ts index 106bf83..8f107a3 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -5,6 +5,7 @@ import { sleep } from "./sleep"; type TxOBEventHandlerResult = { processed_at?: Date; + unprocessable_at?: Date; errors?: { error: unknown; timestamp: Date }[]; }; @@ -38,7 +39,7 @@ export type TxOBEventHandlerMap = Record< type TxOBProcessorClientOpts = { signal?: AbortSignal; - maxErrors: number + maxErrors: number; }; export interface TxOBProcessorClient { @@ -180,6 +181,15 @@ export const processEvents = async ( }); return; } + if (handlerResults.unprocessable_at) { + _opts.logger?.debug("handler unprocessable", { + eventId: lockedEvent.id, + type: lockedEvent.type, + handlerName, + correlationId: lockedEvent.correlation_id, + }); + return; + } handlerResults.errors ??= []; @@ -200,11 +210,20 @@ export const processEvents = async ( error, correlationId: lockedEvent.correlation_id, }); - errored = true; - handlerResults.errors?.push({ - error: (error as Error)?.message ?? error, - timestamp: getDate(), - }); + + if (error instanceof ErrorUnprocessableEventHandler) { + handlerResults.unprocessable_at = getDate(); + handlerResults.errors?.push({ + error: error.message ?? error, + timestamp: getDate(), + }); + } else { + errored = true; + handlerResults.errors?.push({ + error: (error as Error)?.message ?? error, + timestamp: getDate(), + }); + } } lockedEvent.handler_results[handlerName] = handlerResults; @@ -255,6 +274,21 @@ export const processEvents = async ( } }; +/** + * ErrorUnprocessableEventHandler can be thrown by an event handler to indicate that the event handler is unprocessable. + * It wraps the original error that caused the handler to be unprocessable. + * This error will signal the processor to stop processing the event handler and mark the event handler as unprocessable. + */ +export class ErrorUnprocessableEventHandler extends Error { + error: Error; + + constructor(error: Error) { + const message = `unprocessable event handler: ${error.message}`; + super(message); + this.error = error; + } +} + export interface Logger { debug(message?: unknown, ...optionalParams: unknown[]): void; info(message?: unknown, ...optionalParams: unknown[]): void; @@ -310,7 +344,8 @@ export const Processor = ( state = "started"; opts?.logger?.debug("processor started"); - let abortListener: ((this: AbortSignal, ev: Event) => unknown) | null = null; + let abortListener: ((this: AbortSignal, ev: Event) => unknown) | null = + null; (async () => { while (true) {