Skip to content

Commit

Permalink
add correlation id to logs and add early return condition
Browse files Browse the repository at this point in the history
  • Loading branch information
dillonstreator committed Dec 23, 2023
1 parent ccd1441 commit 27e8d6e
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<img src="https://codecov.io/gh/dillonstreator/txob/graph/badge.svg?token=E9M7G67VLL"/>
</a>
<a aria-label="NPM version" href="https://www.npmjs.com/package/txob">
<img alt="" src="https://badgen.net/npm/v/txob?v=0.0.17">
<img alt="" src="https://badgen.net/npm/v/txob?v=0.0.18">
</a>
<a aria-label="License" href="https://github.com/dillonstreator/txob/blob/main/LICENSE">
<img alt="" src="https://badgen.net/npm/license/txob">
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"type": "git",
"url": "git://github.com/dillonstreator/txob.git"
},
"version": "0.0.17",
"version": "0.0.18",
"license": "MIT",
"files": [
"dist",
Expand Down
37 changes: 32 additions & 5 deletions src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,21 @@ export const processEvents = async <TxOBEventType extends string>(
});
return;
}

// While unlikely, the following two conditions are possible if a concurrent processor finished processing this event or reaching maximum errors between the time
// that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked`
// `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
if (lockedEvent.processed_at) {
// While unlikely, this is possible if a concurrent processor finished processing this event between the time
// that this processor found the event with `getReadyToProcessEvents` and called `getReadyToProcessEventByIdForUpdateSkipLocked`
// `getReadyToProcessEventByIdForUpdateSkipLocked` should handle this in its query implementation and return null to save resources
_opts.logger?.debug("skipping already processed event", {
eventId: lockedEvent.id,
correlationId: lockedEvent.correlation_id,
});
return;
}
if (lockedEvent.errors >= _opts.maxErrors) {
_opts.logger?.debug("skipping event with maximum errors", {
eventId: lockedEvent.id,
correlationId: lockedEvent.correlation_id,
});
return;
}
Expand All @@ -138,14 +147,20 @@ export const processEvents = async <TxOBEventType extends string>(
let eventHandlerMap = handlerMap[lockedEvent.type];
if (!eventHandlerMap) {
_opts.logger?.warn("missing event handler map", {
eventId: lockedEvent.id,
type: lockedEvent.type,
correlationId: lockedEvent.correlation_id,
});
errored = true;
lockedEvent.errors = _opts.maxErrors;
eventHandlerMap = {};
}

_opts.logger?.debug(`processing event`, { eventId: lockedEvent.id });
_opts.logger?.debug(`processing event`, {
eventId: lockedEvent.id,
type: lockedEvent.type,
correlationId: lockedEvent.correlation_id,
});

// TODO: consider concurrently processing events handler with max concurrency configuration
//
Expand All @@ -159,7 +174,9 @@ export const processEvents = async <TxOBEventType extends string>(
if (handlerResults.processed_at) {
_opts.logger?.debug("handler already processed", {
eventId: lockedEvent.id,
type: lockedEvent.type,
handlerName,
correlationId: lockedEvent.correlation_id,
});
return;
}
Expand All @@ -171,13 +188,17 @@ export const processEvents = async <TxOBEventType extends string>(
handlerResults.processed_at = getDate();
_opts.logger?.debug("handler succeeded", {
eventId: lockedEvent.id,
type: lockedEvent.type,
handlerName,
correlationId: lockedEvent.correlation_id,
});
} catch (error) {
_opts.logger?.error("handler errored", {
eventId: lockedEvent.id,
type: lockedEvent.type,
handlerName,
error,
correlationId: lockedEvent.correlation_id,
});
errored = true;
handlerResults.errors?.push({
Expand Down Expand Up @@ -205,7 +226,13 @@ export const processEvents = async <TxOBEventType extends string>(
lockedEvent.processed_at = getDate();
}

_opts.logger?.debug("updating event", { errored, lockedEvent });
_opts.logger?.debug("updating event", {
eventId: lockedEvent.id,
type: lockedEvent.type,
lockedEvent,
correlationId: lockedEvent.correlation_id,
errored,
});

// The success of this update is crucial for the processor flow.
// In the event of a failure, any handlers that have successfully executed
Expand Down

0 comments on commit 27e8d6e

Please sign in to comment.