diff --git a/examples/pg/index.ts b/examples/pg/index.ts index 6473636..9a47cb7 100644 --- a/examples/pg/index.ts +++ b/examples/pg/index.ts @@ -1,9 +1,10 @@ import http from "node:http"; import { randomUUID } from "node:crypto"; import { Client } from "pg"; -import { processEvents } from "../../src/processor"; +import { EventProcessor } from "../../src/processor"; import { createProcessorClient } from "../../src/pg/client"; import dotenv from "dotenv"; +import gracefulShutdown from "http-graceful-shutdown"; dotenv.config(); const eventTypes = { @@ -19,35 +20,11 @@ const main = async () => { database: process.env.POSTGRES_DB, }); await client.connect(); - await client.query(`CREATE TABLE IF NOT EXISTS events ( - id UUID, - timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - type VARCHAR(255) NOT NULL, - data JSONB, - correlation_id UUID, - handler_results JSONB, - errors INTEGER, - backoff_until TIMESTAMPTZ, - processed_at TIMESTAMPTZ -)`); - await client.query(`CREATE TABLE IF NOT EXISTS activity ( - id UUID, - timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - ip TEXT, - ua TEXT, - method TEXT, - path TEXT, - correlation_id UUID -)`); - - const ab = new AbortController(); + await migrate(client); - const processorClient = createProcessorClient(client); - - const processorTick = () => { - if (ab.signal.aborted) return; - - processEvents(processorClient, { + const processor = EventProcessor( + createProcessorClient(client), + { ResourceSaved: { thing1: async (event) => { console.log(`${event.id} thing1 ${event.correlation_id}`); @@ -68,11 +45,10 @@ const main = async () => { return; }, }, - }).finally(() => { - setTimeout(processorTick, 5000); - }); - }; - processorTick(); + }, + { sleepTimeMs: 5000, logger: console }, + ); + processor.start(); const server = http.createServer(async (req, res) => { const correlationId = randomUUID(); @@ -120,6 +96,12 @@ const main = async () => { }); const port = process.env.PORT || 3000; server.listen(port, () => console.log(`listening on ${port}`)); + + gracefulShutdown(server, { + onShutdown: async () => { + await processor.stop(); + }, + }); }; if (require.main === module) { @@ -128,3 +110,26 @@ if (require.main === module) { process.exit(1); }); } + +const migrate = async (client: Client): Promise => { + await client.query(`CREATE TABLE IF NOT EXISTS events ( + id UUID, + timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + type VARCHAR(255) NOT NULL, + data JSONB, + correlation_id UUID, + handler_results JSONB, + errors INTEGER, + backoff_until TIMESTAMPTZ, + processed_at TIMESTAMPTZ +)`); + await client.query(`CREATE TABLE IF NOT EXISTS activity ( + id UUID, + timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + ip TEXT, + ua TEXT, + method TEXT, + path TEXT, + correlation_id UUID +)`); +}; diff --git a/examples/pg/nodemon.json b/examples/pg/nodemon.json new file mode 100644 index 0000000..18e017b --- /dev/null +++ b/examples/pg/nodemon.json @@ -0,0 +1,3 @@ +{ + "watch": ["index.ts", "../../src/processor.ts"] +} diff --git a/examples/pg/package.json b/examples/pg/package.json index d2293d8..5af636c 100644 --- a/examples/pg/package.json +++ b/examples/pg/package.json @@ -7,6 +7,7 @@ "dev": "nodemon index.ts" }, "dependencies": { + "http-graceful-shutdown": "^3.1.13", "pg": "^8.11.3" }, "devDependencies": { diff --git a/examples/pg/yarn.lock b/examples/pg/yarn.lock index 71a59d4..94f1b6f 100644 --- a/examples/pg/yarn.lock +++ b/examples/pg/yarn.lock @@ -146,7 +146,7 @@ create-require@^1.1.0: resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333" integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ== -debug@^4: +debug@^4, debug@^4.3.4: version "4.3.4" resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.4.tgz#1319f6579357f2338d3337d2cdd4914bb5dcc865" integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ== @@ -187,6 +187,13 @@ has-flag@^3.0.0: resolved "https://registry.yarnpkg.com/has-flag/-/has-flag-3.0.0.tgz#b5d454dc2199ae225699f3467e5a07f3b955bafd" integrity sha512-sKJf1+ceQBr4SMkvQnBDNDtf4TXpVhVGateu0t918bl30FnbE2m4vNLX+VWe/dpjlb+HugGYzW7uQXH98HPEYw== +http-graceful-shutdown@^3.1.13: + version "3.1.13" + resolved "https://registry.yarnpkg.com/http-graceful-shutdown/-/http-graceful-shutdown-3.1.13.tgz#cf3c8f99787d1f5ac2a6bf8a2132ff54c9ce031e" + integrity sha512-Ci5LRufQ8AtrQ1U26AevS8QoMXDOhnAHCJI3eZu1com7mZGHxREmw3dNj85ftpQokQCvak8nI2pnFS8zyM1M+Q== + dependencies: + debug "^4.3.4" + ignore-by-default@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/ignore-by-default/-/ignore-by-default-1.0.1.tgz#48ca6d72f6c6a3af00a9ad4ae6876be3889e2b09" diff --git a/package.json b/package.json index cec3bd5..f56d147 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.13", + "version": "0.0.14", "license": "MIT", "files": [ "dist", diff --git a/src/pg/client.ts b/src/pg/client.ts index 4352f8b..35a5c3c 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -1,55 +1,55 @@ -import { Client } from "pg"; -import { TxOBEvent, TxOBProcessorClient } from "../processor"; - -interface Querier { - query: Client["query"]; -} - -export const createProcessorClient = ( - querier: Querier, - table: string = "events", -): TxOBProcessorClient => ({ - getUnprocessedEvents: async (opts) => { - const events = await querier.query< - Pick, "id" | "errors"> - >( - `SELECT id, errors FROM ${table} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1`, - [opts.maxErrors], - ); - return events.rows; - }, - transaction: async (fn) => { - try { - await querier.query("BEGIN"); - await fn({ - getEventByIdForUpdateSkipLocked: async (eventId) => { - const event = await querier.query>( - `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`, - [eventId], - ); - if (event.rowCount === 0) { - return null; - } - - return event.rows[0]; - }, - updateEvent: async (event) => { - await querier.query( - `UPDATE ${table} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, - [ - event.handler_results, - event.errors, - event.processed_at, - event.backoff_until, - event.id, - ], - ); - }, - }); - await querier.query("COMMIT"); - } catch (error) { - await querier.query("ROLLBACK").catch(() => {}); - throw error; - } - }, -}); +import { Client } from "pg"; +import { TxOBEvent, TxOBProcessorClient } from "../processor"; + +interface Querier { + query: Client["query"]; +} + +export const createProcessorClient = ( + querier: Querier, + table: string = "events", +): TxOBProcessorClient => ({ + getUnprocessedEvents: async (opts) => { + const events = await querier.query< + Pick, "id" | "errors"> + >( + `SELECT id, errors FROM ${table} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1`, + [opts.maxErrors], + ); + return events.rows; + }, + transaction: async (fn) => { + try { + await querier.query("BEGIN"); + await fn({ + getEventByIdForUpdateSkipLocked: async (eventId) => { + const event = await querier.query>( + `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`, + [eventId], + ); + if (event.rowCount === 0) { + return null; + } + + return event.rows[0]; + }, + updateEvent: async (event) => { + await querier.query( + `UPDATE ${table} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`, + [ + event.handler_results, + event.errors, + event.processed_at, + event.backoff_until, + event.id, + ], + ); + }, + }); + await querier.query("COMMIT"); + } catch (error) { + await querier.query("ROLLBACK").catch(() => {}); + throw error; + } + }, +}); diff --git a/src/processor.ts b/src/processor.ts index 1b0e3f0..0840a8d 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -1,5 +1,6 @@ import { retryable, RetryOpts } from "./retry"; import { getDate } from "./date"; +import EventEmitter from "node:events"; type TxOBEventHandlerResult = { processed_at?: Date; @@ -87,12 +88,9 @@ export const processEvents = async ( }; const events = await client.getUnprocessedEvents(_opts); - if (events.length === 0) { - return; - } - _opts.logger?.debug(`found ${events.length} events to process`); + // TODO: consider concurrently processing events with max concurrency configuration for (const unlockedEvent of events) { if (_opts.signal?.aborted) { return; @@ -144,6 +142,9 @@ export const processEvents = async ( eventHandlerMap = {}; } + _opts.logger?.debug(`processing event`, { eventId: lockedEvent.id }); + + // TODO: consider concurrently processing events handler with max concurrency configuration await Promise.allSettled( Object.entries(eventHandlerMap).map( async ([handlerName, handler]): Promise => { @@ -227,3 +228,141 @@ export interface Logger { warn(message?: any, ...optionalParams: any[]): void; error(message?: any, ...optionalParams: any[]): void; } + +export const EventProcessor = ( + client: TxOBProcessorClient, + handlerMap: TxOBEventHandlerMap, + opts?: Omit, "signal"> & { + sleepTimeMs?: number; + }, +) => { + return Processor( + ({ signal }) => { + return processEvents(client, handlerMap, { + ...opts, + signal, + }); + }, + { + sleepTimeMs: opts?.sleepTimeMs, + logger: opts?.logger, + }, + ); +}; + +class SignalAbortedError extends Error { + constructor() { + super("signal aborted while awaiting next processor tick"); + } +} + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +export const Processor = ( + fn: ({ signal }: { signal: AbortSignal }) => Promise, + opts?: { sleepTimeMs?: number; logger?: Logger }, +) => { + let state: "started" | "stopped" | "stopping" = "stopped"; + const ee = new EventEmitter(); + const ac = new AbortController(); + let shutdownCompleteEmitted = false; + const _opts = { + sleepTimeMs: opts?.sleepTimeMs ?? 5000, + }; + + return { + start: () => { + if (state !== "stopped") { + opts?.logger?.warn(`cannot start processor from '${state}'`); + return; + } + state = "started"; + opts?.logger?.debug("processor started"); + + let abortListener: ((this: AbortSignal, ev: Event) => any) | null = null; + + (async () => { + while (true) { + opts?.logger?.debug("tick"); + try { + await fn({ signal: ac.signal }); + + await Promise.race([ + sleep(_opts.sleepTimeMs), + new Promise((_, reject) => { + if (ac.signal.aborted) return reject(new SignalAbortedError()); + + abortListener = () => reject(new SignalAbortedError()); + ac.signal.addEventListener("abort", abortListener); + }), + ]); + } catch (error) { + if (error instanceof SignalAbortedError) { + opts?.logger?.debug(error.message); + } else { + opts?.logger?.error(error); + } + + break; + } finally { + if (abortListener) + ac.signal.removeEventListener("abort", abortListener); + } + } + + ee.emit("shutdownComplete"); + shutdownCompleteEmitted = true; + })(); + }, + stop: async (stopOpts?: { timeoutMs?: number }) => { + if (state !== "started") { + opts?.logger?.warn(`cannot stop processor from '${state}'`); + return; + } + state = "stopping"; + opts?.logger?.debug("processor stopping"); + + const _stopOpts = { + timeoutMs: 10000, + ...stopOpts, + }; + + let caughtErr; + try { + await Promise.race([ + new Promise((resolve) => { + if (shutdownCompleteEmitted) { + opts?.logger?.debug("shutdownCompleteEmitted caught in shutdown"); + return resolve(); + } + + ee.once("shutdownComplete", () => { + opts?.logger?.debug("shutdownComplete event caught in shutdown"); + resolve(); + }); + opts?.logger?.debug("shutdown aborting AbortController"); + ac.abort(); + }), + new Promise((_, reject) => { + sleep(_stopOpts.timeoutMs).then(() => { + reject( + new Error(`shutdown timeout ${_stopOpts.timeoutMs}ms elapsed`), + ); + }); + }), + ]); + } catch (error) { + caughtErr = error; + } + + ee.removeAllListeners("shutdownComplete"); + state = "stopped"; + opts?.logger?.debug("processor stopped"); + + if (caughtErr) { + opts?.logger?.debug("shutdown error", caughtErr); + throw caughtErr; + } + }, + }; +};