Skip to content

Commit

Permalink
add generic processor with graceful stop and specialized event processor
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Dec 22, 2023
1 parent 83195c1 commit 157dbef
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 95 deletions.
73 changes: 39 additions & 34 deletions examples/pg/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import http from "node:http";
import { randomUUID } from "node:crypto";
import { Client } from "pg";
import { processEvents } from "../../src/processor";
import { EventProcessor } from "../../src/processor";
import { createProcessorClient } from "../../src/pg/client";
import dotenv from "dotenv";
import gracefulShutdown from "http-graceful-shutdown";
dotenv.config();

const eventTypes = {
Expand All @@ -19,35 +20,11 @@ const main = async () => {
database: process.env.POSTGRES_DB,
});
await client.connect();
await client.query(`CREATE TABLE IF NOT EXISTS events (
id UUID,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
type VARCHAR(255) NOT NULL,
data JSONB,
correlation_id UUID,
handler_results JSONB,
errors INTEGER,
backoff_until TIMESTAMPTZ,
processed_at TIMESTAMPTZ
)`);
await client.query(`CREATE TABLE IF NOT EXISTS activity (
id UUID,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
ip TEXT,
ua TEXT,
method TEXT,
path TEXT,
correlation_id UUID
)`);

const ab = new AbortController();
await migrate(client);

const processorClient = createProcessorClient<EventType>(client);

const processorTick = () => {
if (ab.signal.aborted) return;

processEvents(processorClient, {
const processor = EventProcessor(
createProcessorClient<EventType>(client),
{
ResourceSaved: {
thing1: async (event) => {
console.log(`${event.id} thing1 ${event.correlation_id}`);
Expand All @@ -68,11 +45,10 @@ const main = async () => {
return;
},
},
}).finally(() => {
setTimeout(processorTick, 5000);
});
};
processorTick();
},
{ sleepTimeMs: 5000, logger: console },
);
processor.start();

const server = http.createServer(async (req, res) => {
const correlationId = randomUUID();
Expand Down Expand Up @@ -120,6 +96,12 @@ const main = async () => {
});
const port = process.env.PORT || 3000;
server.listen(port, () => console.log(`listening on ${port}`));

gracefulShutdown(server, {
onShutdown: async () => {
await processor.stop();
},
});
};

if (require.main === module) {
Expand All @@ -128,3 +110,26 @@ if (require.main === module) {
process.exit(1);
});
}

const migrate = async (client: Client): Promise<void> => {
await client.query(`CREATE TABLE IF NOT EXISTS events (
id UUID,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
type VARCHAR(255) NOT NULL,
data JSONB,
correlation_id UUID,
handler_results JSONB,
errors INTEGER,
backoff_until TIMESTAMPTZ,
processed_at TIMESTAMPTZ
)`);
await client.query(`CREATE TABLE IF NOT EXISTS activity (
id UUID,
timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
ip TEXT,
ua TEXT,
method TEXT,
path TEXT,
correlation_id UUID
)`);
};
3 changes: 3 additions & 0 deletions examples/pg/nodemon.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"watch": ["index.ts", "../../src/processor.ts"]
}
1 change: 1 addition & 0 deletions examples/pg/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"dev": "nodemon index.ts"
},
"dependencies": {
"http-graceful-shutdown": "^3.1.13",
"pg": "^8.11.3"
},
"devDependencies": {
Expand Down
9 changes: 8 additions & 1 deletion examples/pg/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ create-require@^1.1.0:
resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==

debug@^4:
debug@^4, debug@^4.3.4:
version "4.3.4"
resolved "https://registry.yarnpkg.com/debug/-/debug-4.3.4.tgz#1319f6579357f2338d3337d2cdd4914bb5dcc865"
integrity sha512-PRWFHuSU3eDtQJPvnNY7Jcket1j0t5OuOsFzPPzsekD52Zl8qUfFIPEiswXqIvHWGVHOgX+7G/vCNNhehwxfkQ==
Expand Down Expand Up @@ -187,6 +187,13 @@ has-flag@^3.0.0:
resolved "https://registry.yarnpkg.com/has-flag/-/has-flag-3.0.0.tgz#b5d454dc2199ae225699f3467e5a07f3b955bafd"
integrity sha512-sKJf1+ceQBr4SMkvQnBDNDtf4TXpVhVGateu0t918bl30FnbE2m4vNLX+VWe/dpjlb+HugGYzW7uQXH98HPEYw==

http-graceful-shutdown@^3.1.13:
version "3.1.13"
resolved "https://registry.yarnpkg.com/http-graceful-shutdown/-/http-graceful-shutdown-3.1.13.tgz#cf3c8f99787d1f5ac2a6bf8a2132ff54c9ce031e"
integrity sha512-Ci5LRufQ8AtrQ1U26AevS8QoMXDOhnAHCJI3eZu1com7mZGHxREmw3dNj85ftpQokQCvak8nI2pnFS8zyM1M+Q==
dependencies:
debug "^4.3.4"

ignore-by-default@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/ignore-by-default/-/ignore-by-default-1.0.1.tgz#48ca6d72f6c6a3af00a9ad4ae6876be3889e2b09"
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.13",
"version": "0.0.14",
"license": "MIT",
"files": [
"dist",
Expand Down
110 changes: 55 additions & 55 deletions src/pg/client.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,55 @@
import { Client } from "pg";
import { TxOBEvent, TxOBProcessorClient } from "../processor";

interface Querier {
query: Client["query"];
}

export const createProcessorClient = <EventType extends string>(
querier: Querier,
table: string = "events",
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
const events = await querier.query<
Pick<TxOBEvent<EventType>, "id" | "errors">
>(
`SELECT id, errors FROM ${table} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1`,
[opts.maxErrors],
);
return events.rows;
},
transaction: async (fn) => {
try {
await querier.query("BEGIN");
await fn({
getEventByIdForUpdateSkipLocked: async (eventId) => {
const event = await querier.query<TxOBEvent<EventType>>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`,
[eventId],
);
if (event.rowCount === 0) {
return null;
}

return event.rows[0];
},
updateEvent: async (event) => {
await querier.query(
`UPDATE ${table} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`,
[
event.handler_results,
event.errors,
event.processed_at,
event.backoff_until,
event.id,
],
);
},
});
await querier.query("COMMIT");
} catch (error) {
await querier.query("ROLLBACK").catch(() => {});
throw error;
}
},
});
import { Client } from "pg";
import { TxOBEvent, TxOBProcessorClient } from "../processor";

interface Querier {
query: Client["query"];
}

export const createProcessorClient = <EventType extends string>(
querier: Querier,
table: string = "events",
): TxOBProcessorClient<EventType> => ({
getUnprocessedEvents: async (opts) => {
const events = await querier.query<
Pick<TxOBEvent<EventType>, "id" | "errors">
>(
`SELECT id, errors FROM ${table} WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1`,
[opts.maxErrors],
);
return events.rows;
},
transaction: async (fn) => {
try {
await querier.query("BEGIN");
await fn({
getEventByIdForUpdateSkipLocked: async (eventId) => {
const event = await querier.query<TxOBEvent<EventType>>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 FOR UPDATE SKIP LOCKED`,
[eventId],
);
if (event.rowCount === 0) {
return null;
}

return event.rows[0];
},
updateEvent: async (event) => {
await querier.query(
`UPDATE ${table} SET handler_results = $1, errors = $2, processed_at = $3, backoff_until = $4 WHERE id = $5`,
[
event.handler_results,
event.errors,
event.processed_at,
event.backoff_until,
event.id,
],
);
},
});
await querier.query("COMMIT");
} catch (error) {
await querier.query("ROLLBACK").catch(() => {});
throw error;
}
},
});
Loading

0 comments on commit 157dbef

Please sign in to comment.