diff --git a/README.md b/README.md index 3018c92..6196f40 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ - + diff --git a/package.json b/package.json index fb1bc55..9e61f37 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "type": "git", "url": "git://github.com/dillonstreator/txob.git" }, - "version": "0.0.17", + "version": "0.0.18", "license": "MIT", "files": [ "dist", diff --git a/src/processor.ts b/src/processor.ts index e23349a..8d33cb3 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -123,12 +123,21 @@ export const processEvents = async ( }); return; } + + // While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time + // that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked` + // `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources if (lockedEvent.processed_at) { - // While unlikely, this is possible if a concurrent processor finished processing this event between the time - // that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked` - // `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources _opts.logger?.debug("skipping already processed event", { eventId: lockedEvent.id, + correlationId: lockedEvent.correlation_id, + }); + return; + } + if (lockedEvent.errors >= _opts.maxErrors) { + _opts.logger?.debug("skipping event with maximum errors", { + eventId: lockedEvent.id, + correlationId: lockedEvent.correlation_id, }); return; } @@ -138,14 +147,20 @@ export const processEvents = async ( let eventHandlerMap = handlerMap[lockedEvent.type]; if (!eventHandlerMap) { _opts.logger?.warn("missing event handler map", { + eventId: lockedEvent.id, type: lockedEvent.type, + correlationId: lockedEvent.correlation_id, }); errored = true; lockedEvent.errors = _opts.maxErrors; eventHandlerMap = {}; } - _opts.logger?.debug(`processing event`, { eventId: lockedEvent.id }); + _opts.logger?.debug(`processing event`, { + eventId: lockedEvent.id, + type: lockedEvent.type, + correlationId: lockedEvent.correlation_id, + }); // TODO: consider concurrently processing events handler with max concurrency configuration // @@ -159,7 +174,9 @@ export const processEvents = async ( if (handlerResults.processed_at) { _opts.logger?.debug("handler already processed", { eventId: lockedEvent.id, + type: lockedEvent.type, handlerName, + correlationId: lockedEvent.correlation_id, }); return; } @@ -171,13 +188,17 @@ export const processEvents = async ( handlerResults.processed_at = getDate(); _opts.logger?.debug("handler succeeded", { eventId: lockedEvent.id, + type: lockedEvent.type, handlerName, + correlationId: lockedEvent.correlation_id, }); } catch (error) { _opts.logger?.error("handler errored", { eventId: lockedEvent.id, + type: lockedEvent.type, handlerName, error, + correlationId: lockedEvent.correlation_id, }); errored = true; handlerResults.errors?.push({ @@ -205,7 +226,13 @@ export const processEvents = async ( lockedEvent.processed_at = getDate(); } - _opts.logger?.debug("updating event", { errored, lockedEvent }); + _opts.logger?.debug("updating event", { + eventId: lockedEvent.id, + type: lockedEvent.type, + lockedEvent, + correlationId: lockedEvent.correlation_id, + errored, + }); // The success of this update is crucial for the processor flow. // In the event of a failure, any handlers that have successfully executed