diff --git a/README.md b/README.md
index d486542..101f0ab 100644
--- a/README.md
+++ b/README.md
@@ -5,7 +5,7 @@
-
+
diff --git a/package.json b/package.json
index 9e61f37..354743f 100644
--- a/package.json
+++ b/package.json
@@ -15,7 +15,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
- "version": "0.0.18",
+ "version": "0.0.19",
"license": "MIT",
"files": [
"dist",
diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts
index 7e00e93..4e9cfe4 100644
--- a/src/mongodb/client.ts
+++ b/src/mongodb/client.ts
@@ -13,7 +13,7 @@ export const createProcessorClient = (
db: string,
collection: string = "events",
): TxOBProcessorClient => ({
- getReadyToProcessEvents: async (opts) => {
+ findReadyToProcessEvents: async (opts) => {
const events = (await mongo
.db(db)
.collection(collection)
@@ -26,7 +26,7 @@ export const createProcessorClient = (
transaction: async (fn) => {
await mongo.withSession(async (session): Promise => {
await fn({
- getReadyToProcessEventByIdForUpdateSkipLocked: async (
+ findReadyToProcessEventByIdForUpdateSkipLocked: async (
eventId,
opts,
) => {
diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts
index e4c4d0f..7012397 100644
--- a/src/pg/client.test.ts
+++ b/src/pg/client.test.ts
@@ -7,12 +7,12 @@ describe("createProcessorClient", () => {
query: vi.fn(),
};
const client = createProcessorClient(pgClient);
- expect(typeof client.getReadyToProcessEvents).toBe("function");
+ expect(typeof client.findReadyToProcessEvents).toBe("function");
expect(typeof client.transaction).toBe("function");
});
});
-describe("getReadyToProcessEvents", () => {
+describe("findReadyToProcessEvents", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
@@ -26,7 +26,7 @@ describe("getReadyToProcessEvents", () => {
maxErrors: 10,
};
const client = createProcessorClient(pgClient);
- const result = await client.getReadyToProcessEvents(opts);
+ const result = await client.findReadyToProcessEvents(opts);
expect(pgClient.query).toHaveBeenCalledOnce();
expect(pgClient.query).toHaveBeenCalledWith(
"SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1",
@@ -62,7 +62,7 @@ describe("transaction", () => {
expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK");
});
- describe("getReadyToProcessEventByIdForUpdateSkipLocked", () => {
+ describe("findReadyToProcessEventByIdForUpdateSkipLocked", () => {
it("should execute the correct query", async () => {
const rows = [1, 2, 3];
const pgClient = {
@@ -77,7 +77,7 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
- result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
+ result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 });
});
expect(pgClient.query).toHaveBeenCalledTimes(3);
@@ -102,7 +102,7 @@ describe("transaction", () => {
const client = createProcessorClient(pgClient);
let result: any;
await client.transaction(async (txClient) => {
- result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
+ result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 });
});
expect(pgClient.query).toHaveBeenCalledTimes(3);
diff --git a/src/pg/client.ts b/src/pg/client.ts
index 2b79664..41c5792 100644
--- a/src/pg/client.ts
+++ b/src/pg/client.ts
@@ -5,14 +5,14 @@ interface Querier {
query: Client["query"];
}
-// TODO: leverage the signal option that comes in on options for `getReadyToProcessEvents` and `getReadyToProcessEventByIdForUpdateSkipLocked`
+// TODO: leverage the signal option that comes in on options for `findReadyToProcessEvents` and `findReadyToProcessEventByIdForUpdateSkipLocked`
// to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774
export const createProcessorClient = (
querier: Querier,
table: string = "events",
): TxOBProcessorClient => ({
- getReadyToProcessEvents: async (opts) => {
+ findReadyToProcessEvents: async (opts) => {
const events = await querier.query<
Pick, "id" | "errors">
>(
@@ -25,7 +25,7 @@ export const createProcessorClient = (
try {
await querier.query("BEGIN");
await fn({
- getReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => {
+ findReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => {
const event = await querier.query>(
`SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`,
[eventId, opts.maxErrors],
diff --git a/src/processor.test.ts b/src/processor.test.ts
index 437a258..039e7e3 100644
--- a/src/processor.test.ts
+++ b/src/processor.test.ts
@@ -9,11 +9,11 @@ import {
import { sleep } from "./sleep";
const mockTxClient = {
- getReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(),
+ findReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(),
updateEvent: vi.fn(),
};
const mockClient = {
- getReadyToProcessEvents: vi.fn(),
+ findReadyToProcessEvents: vi.fn(),
transaction: vi.fn(async (fn) => fn(mockTxClient)),
};
@@ -37,12 +37,12 @@ describe("processEvents", () => {
backoff: () => now,
};
const handlerMap = {};
- mockClient.getReadyToProcessEvents.mockImplementation(() => []);
+ mockClient.findReadyToProcessEvents.mockImplementation(() => []);
processEvents(mockClient, handlerMap, opts);
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce();
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts);
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).not.toHaveBeenCalled();
- expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
+ expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});
@@ -113,8 +113,8 @@ describe("processEvents", () => {
processed_at: now,
};
const events = [evt1, evt2, evt3, evt4];
- mockClient.getReadyToProcessEvents.mockImplementation(() => events);
- mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
+ mockClient.findReadyToProcessEvents.mockImplementation(() => events);
+ mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
if (id === evt3.id) return null;
return events.find((e) => e.id === id);
@@ -128,8 +128,8 @@ describe("processEvents", () => {
await processEvents(mockClient, handlerMap, opts);
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce();
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts);
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).toHaveBeenCalledTimes(3);
@@ -143,7 +143,7 @@ describe("processEvents", () => {
});
expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled();
- expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
+ expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
3,
);
@@ -213,8 +213,8 @@ describe("processEvents", () => {
errors: 1,
};
const events = [evt1];
- mockClient.getReadyToProcessEvents.mockImplementation(() => events);
- mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
+ mockClient.findReadyToProcessEvents.mockImplementation(() => events);
+ mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => {
return events.find((e) => e.id === id);
});
mockTxClient.updateEvent.mockImplementation(() => {
@@ -223,8 +223,8 @@ describe("processEvents", () => {
await processEvents(mockClient, handlerMap, opts);
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce();
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts);
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts);
expect(mockClient.transaction).toHaveBeenCalledTimes(1);
@@ -232,7 +232,7 @@ describe("processEvents", () => {
expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, {
signal: undefined,
});
- expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
+ expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes(
1,
);
@@ -320,15 +320,15 @@ describe("EventProcessor", () => {
backoff: () => now,
};
const handlerMap = {};
- mockClient.getReadyToProcessEvents.mockImplementation(() => []);
+ mockClient.findReadyToProcessEvents.mockImplementation(() => []);
const processor = EventProcessor(mockClient, handlerMap, opts);
processor.start();
await processor.stop();
- expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce();
+ expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce();
expect(mockClient.transaction).not.toHaveBeenCalled();
- expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
+ expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled();
expect(mockTxClient.updateEvent).not.toHaveBeenCalled();
});
});
diff --git a/src/processor.ts b/src/processor.ts
index 8d33cb3..16b8f42 100644
--- a/src/processor.ts
+++ b/src/processor.ts
@@ -42,7 +42,7 @@ type TxOBProcessorClientOpts = {
};
export interface TxOBProcessorClient {
- getReadyToProcessEvents(
+ findReadyToProcessEvents(
opts: TxOBProcessorClientOpts,
): Promise, "id" | "errors">[]>;
transaction(
@@ -53,7 +53,7 @@ export interface TxOBProcessorClient {
}
export interface TxOBTransactionProcessorClient {
- getReadyToProcessEventByIdForUpdateSkipLocked(
+ findReadyToProcessEventByIdForUpdateSkipLocked(
eventId: TxOBEvent["id"],
opts: TxOBProcessorClientOpts,
): Promise | null>;
@@ -89,7 +89,7 @@ export const processEvents = async (
...opts,
};
- const events = await client.getReadyToProcessEvents(_opts);
+ const events = await client.findReadyToProcessEvents(_opts);
_opts.logger?.debug(`found ${events.length} events to process`);
// TODO: consider concurrently processing events with max concurrency configuration
@@ -99,9 +99,9 @@ export const processEvents = async (
}
if (unlockedEvent.errors >= _opts.maxErrors) {
// Potential issue with client configuration on finding unprocessed events
- // Events with maximum allowed errors should not be returned from `getReadyToProcessEvents`
+ // Events with maximum allowed errors should not be returned from `findReadyToProcessEvents`
_opts.logger?.warn(
- "unexpected event with max errors returned from `getReadyToProcessEvents`",
+ "unexpected event with max errors returned from `findReadyToProcessEvents`",
{
eventId: unlockedEvent.id,
errors: unlockedEvent.errors,
@@ -113,7 +113,7 @@ export const processEvents = async (
try {
await client.transaction(async (txClient) => {
- const lockedEvent = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(
+ const lockedEvent = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(
unlockedEvent.id,
{ signal: _opts.signal, maxErrors: _opts.maxErrors },
);
@@ -125,8 +125,8 @@ export const processEvents = async (
}
// While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time
- // that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked`
- // `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
+ // that this processor found the event with `findReadyToProcessEvents` and called `findReadyToProcessEventByIdForUpdateSkipLocked`
+ // `findReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
if (lockedEvent.processed_at) {
_opts.logger?.debug("skipping already processed event", {
eventId: lockedEvent.id,