From 794f2167caf3cfc0972ec59f05fa37303f1e1ac8 Mon Sep 17 00:00:00 2001 From: dillonstreator Date: Tue, 20 Feb 2024 18:59:18 -0600 Subject: [PATCH] getReady -> findReady --- README.md | 2 +- package.json | 2 +- src/mongodb/client.ts | 4 ++-- src/pg/client.test.ts | 12 ++++++------ src/pg/client.ts | 6 +++--- src/processor.test.ts | 38 +++++++++++++++++++------------------- src/processor.ts | 16 ++++++++-------- 7 files changed, 40 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index d486542..101f0ab 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ - + diff --git a/package.json b/package.json index 9e61f37..354743f 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.18", + "version": "0.0.19", "license": "MIT", "files": [ "dist", diff --git a/src/mongodb/client.ts b/src/mongodb/client.ts index 7e00e93..4e9cfe4 100644 --- a/src/mongodb/client.ts +++ b/src/mongodb/client.ts @@ -13,7 +13,7 @@ export const createProcessorClient = ( db: string, collection: string = "events", ): TxOBProcessorClient => ({ - getReadyToProcessEvents: async (opts) => { + findReadyToProcessEvents: async (opts) => { const events = (await mongo .db(db) .collection(collection) @@ -26,7 +26,7 @@ export const createProcessorClient = ( transaction: async (fn) => { await mongo.withSession(async (session): Promise => { await fn({ - getReadyToProcessEventByIdForUpdateSkipLocked: async ( + findReadyToProcessEventByIdForUpdateSkipLocked: async ( eventId, opts, ) => { diff --git a/src/pg/client.test.ts b/src/pg/client.test.ts index e4c4d0f..7012397 100644 --- a/src/pg/client.test.ts +++ b/src/pg/client.test.ts @@ -7,12 +7,12 @@ describe("createProcessorClient", () => { query: vi.fn(), }; const client = createProcessorClient(pgClient); - expect(typeof client.getReadyToProcessEvents).toBe("function"); + expect(typeof client.findReadyToProcessEvents).toBe("function"); expect(typeof client.transaction).toBe("function"); }); }); -describe("getReadyToProcessEvents", () => { +describe("findReadyToProcessEvents", () => { it("should execute the correct query", async () => { const rows = [1, 2, 3]; const pgClient = { @@ -26,7 +26,7 @@ describe("getReadyToProcessEvents", () => { maxErrors: 10, }; const client = createProcessorClient(pgClient); - const result = await client.getReadyToProcessEvents(opts); + const result = await client.findReadyToProcessEvents(opts); expect(pgClient.query).toHaveBeenCalledOnce(); expect(pgClient.query).toHaveBeenCalledWith( "SELECT id, errors FROM events WHERE processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $1", @@ -62,7 +62,7 @@ describe("transaction", () => { expect(pgClient.query).toHaveBeenNthCalledWith(2, "ROLLBACK"); }); - describe("getReadyToProcessEventByIdForUpdateSkipLocked", () => { + describe("findReadyToProcessEventByIdForUpdateSkipLocked", () => { it("should execute the correct query", async () => { const rows = [1, 2, 3]; const pgClient = { @@ -77,7 +77,7 @@ describe("transaction", () => { const client = createProcessorClient(pgClient); let result: any; await client.transaction(async (txClient) => { - result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 }); + result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 6 }); }); expect(pgClient.query).toHaveBeenCalledTimes(3); @@ -102,7 +102,7 @@ describe("transaction", () => { const client = createProcessorClient(pgClient); let result: any; await client.transaction(async (txClient) => { - result = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 }); + result = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked(eventId, { maxErrors: 5 }); }); expect(pgClient.query).toHaveBeenCalledTimes(3); diff --git a/src/pg/client.ts b/src/pg/client.ts index 2b79664..41c5792 100644 --- a/src/pg/client.ts +++ b/src/pg/client.ts @@ -5,14 +5,14 @@ interface Querier { query: Client["query"]; } -// TODO: leverage the signal option that comes in on options for `getReadyToProcessEvents` and `getReadyToProcessEventByIdForUpdateSkipLocked` +// TODO: leverage the signal option that comes in on options for `findReadyToProcessEvents` and `findReadyToProcessEventByIdForUpdateSkipLocked` // to cancel queries if/when supported by `pg` https://github.com/brianc/node-postgres/issues/2774 export const createProcessorClient = ( querier: Querier, table: string = "events", ): TxOBProcessorClient => ({ - getReadyToProcessEvents: async (opts) => { + findReadyToProcessEvents: async (opts) => { const events = await querier.query< Pick, "id" | "errors"> >( @@ -25,7 +25,7 @@ export const createProcessorClient = ( try { await querier.query("BEGIN"); await fn({ - getReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => { + findReadyToProcessEventByIdForUpdateSkipLocked: async (eventId, opts) => { const event = await querier.query>( `SELECT id, timestamp, type, data, correlation_id, handler_results, errors, backoff_until, processed_at FROM ${table} WHERE id = $1 AND processed_at IS NULL AND (backoff_until IS NULL OR backoff_until < NOW()) AND errors < $2 FOR UPDATE SKIP LOCKED`, [eventId, opts.maxErrors], diff --git a/src/processor.test.ts b/src/processor.test.ts index 437a258..039e7e3 100644 --- a/src/processor.test.ts +++ b/src/processor.test.ts @@ -9,11 +9,11 @@ import { import { sleep } from "./sleep"; const mockTxClient = { - getReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(), + findReadyToProcessEventByIdForUpdateSkipLocked: vi.fn(), updateEvent: vi.fn(), }; const mockClient = { - getReadyToProcessEvents: vi.fn(), + findReadyToProcessEvents: vi.fn(), transaction: vi.fn(async (fn) => fn(mockTxClient)), }; @@ -37,12 +37,12 @@ describe("processEvents", () => { backoff: () => now, }; const handlerMap = {}; - mockClient.getReadyToProcessEvents.mockImplementation(() => []); + mockClient.findReadyToProcessEvents.mockImplementation(() => []); processEvents(mockClient, handlerMap, opts); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts); expect(mockClient.transaction).not.toHaveBeenCalled(); - expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); + expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); @@ -113,8 +113,8 @@ describe("processEvents", () => { processed_at: now, }; const events = [evt1, evt2, evt3, evt4]; - mockClient.getReadyToProcessEvents.mockImplementation(() => events); - mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { + mockClient.findReadyToProcessEvents.mockImplementation(() => events); + mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { if (id === evt3.id) return null; return events.find((e) => e.id === id); @@ -128,8 +128,8 @@ describe("processEvents", () => { await processEvents(mockClient, handlerMap, opts); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts); expect(mockClient.transaction).toHaveBeenCalledTimes(3); @@ -143,7 +143,7 @@ describe("processEvents", () => { }); expect(handlerMap.evtType1.handler3).not.toHaveBeenCalled(); - expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( 3, ); @@ -213,8 +213,8 @@ describe("processEvents", () => { errors: 1, }; const events = [evt1]; - mockClient.getReadyToProcessEvents.mockImplementation(() => events); - mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { + mockClient.findReadyToProcessEvents.mockImplementation(() => events); + mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked.mockImplementation((id) => { return events.find((e) => e.id === id); }); mockTxClient.updateEvent.mockImplementation(() => { @@ -223,8 +223,8 @@ describe("processEvents", () => { await processEvents(mockClient, handlerMap, opts); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledWith(opts); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledWith(opts); expect(mockClient.transaction).toHaveBeenCalledTimes(1); @@ -232,7 +232,7 @@ describe("processEvents", () => { expect(handlerMap.evtType1.handler1).toHaveBeenCalledWith(evt1, { signal: undefined, }); - expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( + expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).toHaveBeenCalledTimes( 1, ); @@ -320,15 +320,15 @@ describe("EventProcessor", () => { backoff: () => now, }; const handlerMap = {}; - mockClient.getReadyToProcessEvents.mockImplementation(() => []); + mockClient.findReadyToProcessEvents.mockImplementation(() => []); const processor = EventProcessor(mockClient, handlerMap, opts); processor.start(); await processor.stop(); - expect(mockClient.getReadyToProcessEvents).toHaveBeenCalledOnce(); + expect(mockClient.findReadyToProcessEvents).toHaveBeenCalledOnce(); expect(mockClient.transaction).not.toHaveBeenCalled(); - expect(mockTxClient.getReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); + expect(mockTxClient.findReadyToProcessEventByIdForUpdateSkipLocked).not.toHaveBeenCalled(); expect(mockTxClient.updateEvent).not.toHaveBeenCalled(); }); }); diff --git a/src/processor.ts b/src/processor.ts index 8d33cb3..16b8f42 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -42,7 +42,7 @@ type TxOBProcessorClientOpts = { }; export interface TxOBProcessorClient { - getReadyToProcessEvents( + findReadyToProcessEvents( opts: TxOBProcessorClientOpts, ): Promise, "id" | "errors">[]>; transaction( @@ -53,7 +53,7 @@ export interface TxOBProcessorClient { } export interface TxOBTransactionProcessorClient { - getReadyToProcessEventByIdForUpdateSkipLocked( + findReadyToProcessEventByIdForUpdateSkipLocked( eventId: TxOBEvent["id"], opts: TxOBProcessorClientOpts, ): Promise | null>; @@ -89,7 +89,7 @@ export const processEvents = async ( ...opts, }; - const events = await client.getReadyToProcessEvents(_opts); + const events = await client.findReadyToProcessEvents(_opts); _opts.logger?.debug(`found ${events.length} events to process`); // TODO: consider concurrently processing events with max concurrency configuration @@ -99,9 +99,9 @@ export const processEvents = async ( } if (unlockedEvent.errors >= _opts.maxErrors) { // Potential issue with client configuration on finding unprocessed events - // Events with maximum allowed errors should not be returned from `getReadyToProcessEvents` + // Events with maximum allowed errors should not be returned from `findReadyToProcessEvents` _opts.logger?.warn( - "unexpected event with max errors returned from `getReadyToProcessEvents`", + "unexpected event with max errors returned from `findReadyToProcessEvents`", { eventId: unlockedEvent.id, errors: unlockedEvent.errors, @@ -113,7 +113,7 @@ export const processEvents = async ( try { await client.transaction(async (txClient) => { - const lockedEvent = await txClient.getReadyToProcessEventByIdForUpdateSkipLocked( + const lockedEvent = await txClient.findReadyToProcessEventByIdForUpdateSkipLocked( unlockedEvent.id, { signal: _opts.signal, maxErrors: _opts.maxErrors }, ); @@ -125,8 +125,8 @@ export const processEvents = async ( } // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time - // that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked` - // `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources + // that this processor found the event with `findReadyToProcessEvents` and called `findReadyToProcessEventByIdForUpdateSkipLocked` + // `findReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources if (lockedEvent.processed_at) { _opts.logger?.debug("skipping already processed event", { eventId: lockedEvent.id,