Skip to content

Commit

Permalink
feat: add a sentinel error to signal the processor to stop attempting…
Browse files Browse the repository at this point in the history
… to re-execute the handler on subsequent ticks
  • Loading branch information
dillonstreator committed Apr 24, 2024
1 parent f46bc93 commit d643d4e
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 8 deletions.
7 changes: 6 additions & 1 deletion examples/pg/processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { Client } from "pg";
import { EventProcessor } from "../../src/processor";
import {
ErrorUnprocessableEventHandler,
EventProcessor,
} from "../../src/processor";
import { createProcessorClient } from "../../src/pg/client";
import { migrate, type EventType } from "./index";
import dotenv from "dotenv";
Expand Down Expand Up @@ -29,6 +32,8 @@ let processor: ReturnType<typeof EventProcessor> | undefined = undefined;
thing2: async (event) => {
console.log(`${event.id} thing2 ${event.correlation_id}`);
if (Math.random() > 0.9) throw new Error("some issue");
if (Math.random() > 0.6)
throw new ErrorUnprocessableEventHandler(new Error("parent error"));

return;
},
Expand Down
99 changes: 99 additions & 0 deletions src/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
EventProcessor,
Processor,
TxOBEvent,
ErrorUnprocessableEventHandler,
defaultBackoff,
processEvents,
} from "./processor";
Expand Down Expand Up @@ -262,6 +263,104 @@ describe("processEvents", () => {
processed_at: now,
});
});

it("respects ErrorUnprocessableEventHandler sentinel error to stop handler processing", async () => {
const opts = {
maxErrors: 5,
backoff: vi.fn(),
};
const err = new Error("some error");
const errUnprocessable = new ErrorUnprocessableEventHandler(
new Error("err1"),
);
const handlerMap = {
evtType1: {
handler1: vi.fn(() => Promise.reject(errUnprocessable)),
handler2: vi.fn(() => Promise.reject(errUnprocessable)),
},
};
const evt1: TxOBEvent<keyof typeof handlerMap> = {
type: "evtType1",
id: "1",
timestamp: now,
data: {},
correlation_id: "abc123",
handler_results: {
handler1: {
errors: [{ error: err.message, timestamp: now }],
},
handler2: {
processed_at: now,
},
handler3: {
unprocessable_at: now,
errors: [
{
error: errUnprocessable.message,
timestamp: now,
},
],
},
},
errors: 1,
};
const events = [evt1];
mockClient.getEventsToProcess.mockImplementation(() => events);
mockTxClient.getEventByIdForUpdateSkipLocked.mockImplementation((id) => {
return events.find((e) => e.id === id);
});
mockTxClient.updateEvent.mockImplementation(() => {
return Promise.resolve();
});

await processEvents(mockClient, handlerMap, opts);

expect(mockClient.getEventsToProcess).toHaveBeenCalledOnce();
expect(mockClient.getEventsToProcess).toHaveBeenCalledWith(opts);

expect(mockClient.transaction).toHaveBeenCalledTimes(1);

expect(handlerMap.evtType1.handler1).toHaveBeenCalledOnce();
expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, {
signal: undefined,
});
expect(mockTxClient.getEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
1,
);

expect(mockTxClient.updateEvent).toHaveBeenCalledTimes(1);
expect(mockTxClient.updateEvent).toHaveBeenCalledWith({
backoff_until: null,
correlation_id: "abc123",
data: {},
errors: 1,
handler_results: {
handler1: {
unprocessable_at: now,
errors: [
{ error: err.message, timestamp: now },
{ error: errUnprocessable.message, timestamp: now },
],
},
handler2: {
processed_at: now,
},
handler3: {
unprocessable_at: now,
errors: [
{
error: errUnprocessable.message,
timestamp: now,
},
],
},
},
id: "1",
timestamp: now,
type: "evtType1",
processed_at: now,
});
});
});

describe("defaultBackoff", () => {
Expand Down
49 changes: 42 additions & 7 deletions src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { sleep } from "./sleep";

type TxOBEventHandlerResult = {
processed_at?: Date;
unprocessable_at?: Date;
errors?: { error: unknown; timestamp: Date }[];
};

Expand Down Expand Up @@ -38,7 +39,7 @@ export type TxOBEventHandlerMap<TxOBEventType extends string> = Record<

type TxOBProcessorClientOpts = {
signal?: AbortSignal;
maxErrors: number
maxErrors: number;
};

export interface TxOBProcessorClient<TxOBEventType extends string> {
Expand Down Expand Up @@ -180,6 +181,15 @@ export const processEvents = async <TxOBEventType extends string>(
});
return;
}
if (handlerResults.unprocessable_at) {
_opts.logger?.debug("handler unprocessable", {
eventId: lockedEvent.id,
type: lockedEvent.type,
handlerName,
correlationId: lockedEvent.correlation_id,
});
return;
}

handlerResults.errors ??= [];

Expand All @@ -200,11 +210,20 @@ export const processEvents = async <TxOBEventType extends string>(
error,
correlationId: lockedEvent.correlation_id,
});
errored = true;
handlerResults.errors?.push({
error: (error as Error)?.message ?? error,
timestamp: getDate(),
});

if (error instanceof ErrorUnprocessableEventHandler) {
handlerResults.unprocessable_at = getDate();
handlerResults.errors?.push({
error: error.message ?? error,
timestamp: getDate(),
});
} else {
errored = true;
handlerResults.errors?.push({
error: (error as Error)?.message ?? error,
timestamp: getDate(),
});
}
}

lockedEvent.handler_results[handlerName] = handlerResults;
Expand Down Expand Up @@ -255,6 +274,21 @@ export const processEvents = async <TxOBEventType extends string>(
}
};

/**
* ErrorUnprocessableEventHandler can be thrown by an event handler to indicate that the event handler is unprocessable.
* It wraps the original error that caused the handler to be unprocessable.
* This error will signal the processor to stop processing the event handler and mark the event handler as unprocessable.
*/
export class ErrorUnprocessableEventHandler extends Error {
error: Error;

constructor(error: Error) {
const message = `unprocessable event handler: ${error.message}`;
super(message);
this.error = error;
}
}

export interface Logger {
debug(message?: unknown, ...optionalParams: unknown[]): void;
info(message?: unknown, ...optionalParams: unknown[]): void;
Expand Down Expand Up @@ -310,7 +344,8 @@ export const Processor = (
state = "started";
opts?.logger?.debug("processor started");

let abortListener: ((this: AbortSignal, ev: Event) => unknown) | null = null;
let abortListener: ((this: AbortSignal, ev: Event) => unknown) | null =
null;

(async () => {
while (true) {
Expand Down

0 comments on commit d643d4e

Please sign in to comment.