Skip to content

Commit

Permalink
chore: refactor to use sources.job_id to group events
Browse files Browse the repository at this point in the history
  • Loading branch information
koladilip committed Sep 11, 2024
1 parent 055077a commit 13a7d00
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 224 deletions.
7 changes: 1 addition & 6 deletions src/v0/util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2303,12 +2303,7 @@ const applyJSONStringTemplate = (message, template) =>
*/
const groupRouterTransformEvents = (events) =>
Object.values(
lodash.groupBy(events, (ev) => [
ev.metadata?.destinationId || ev.destination?.ID,
ev.metadata?.sourceCategory === 'warehouse'
? ev.metadata?.sourceId || ev.connection?.sourceId
: 'default',
]),
lodash.groupBy(events, (ev) => [ev.destination?.ID, ev.context?.sources?.job_id || 'default']),
);

/*
Expand Down
300 changes: 82 additions & 218 deletions src/v0/util/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -695,245 +695,109 @@ describe('extractCustomFields', () => {
});

describe('groupRouterTransformEvents', () => {
// groups events correctly by destination ID
it('should group events correctly by destination ID and source ID', () => {
it('should group events by destination.ID and context.sources.job_id', () => {
const events = [
{
message: {
eventID: 'evt001',
eventName: 'OrderReceived',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST001',
sourceCategory: 'warehouse',
},
},
{
message: {
eventID: 'evt002',
eventName: 'InventoryUpdate',
},
metadata: {
sourceId: 'SRC002',
destinationId: 'DEST002',
sourceCategory: 'cloud',
},
destination: { ID: 'dest1' },
context: { sources: { job_id: 'job1' } },
},
{
message: {
eventID: 'evt003',
eventName: 'UserLogin',
},
metadata: {
sourceId: 'SRC003',
destinationId: 'DEST003',
sourceCategory: 'webhook',
},
destination: { ID: 'dest1' },
context: { sources: { job_id: 'job2' } },
},
{
message: {
eventID: 'evt004',
eventName: 'PaymentProcessed',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST002',
sourceCategory: 'warehouse',
},
destination: { ID: 'dest2' },
context: { sources: { job_id: 'job1' } },
},
];
const result = groupRouterTransformEvents(events);

expect(result.length).toBe(3); // 3 unique groups
expect(result).toEqual([
[{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }],
[{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job2' } } }],
[{ destination: { ID: 'dest2' }, context: { sources: { job_id: 'job1' } } }],
]);
});

it('should group events by default job_id if context.sources.job_id is missing', () => {
const events = [
{
message: {
eventID: 'evt005',
eventName: 'ShipmentDispatched',
},
metadata: {
sourceId: 'SRC002',
destinationId: 'DEST003',
sourceCategory: 'cloud',
},
destination: { ID: 'dest1' },
context: { sources: {} },
},
{
message: {
eventID: 'evt006',
eventName: 'ProductReturn',
},
metadata: {
sourceId: 'SRC003',
destinationId: 'DEST001',
sourceCategory: 'webhook',
},
destination: { ID: 'dest1' },
context: { sources: { job_id: 'job1' } },
},
];
const result = groupRouterTransformEvents(events);

expect(result.length).toBe(2); // 2 unique groups
expect(result).toEqual([
[{ destination: { ID: 'dest1' }, context: { sources: {} } }],
[{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }],
]);
});

it('should group events by default job_id if context or context.sources is missing', () => {
const events = [
{
message: {
eventID: 'evt007',
eventName: 'StockAlert',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST003',
sourceCategory: 'warehouse',
},
destination: { ID: 'dest1' },
},
{
message: {
eventID: 'evt008',
eventName: 'UserRegistration',
},
metadata: {
sourceId: 'SRC002',
destinationId: 'DEST001',
sourceCategory: 'cloud',
},
destination: { ID: 'dest1' },
context: { sources: { job_id: 'job1' } },
},
];
const result = groupRouterTransformEvents(events);

expect(result.length).toBe(2); // 2 unique groups
expect(result).toEqual([
[{ destination: { ID: 'dest1' } }],
[{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }],
]);
});

it('should use "default" when destination.ID is missing', () => {
const events = [
{
message: {
eventID: 'evt009',
eventName: 'ReviewSubmitted',
},
metadata: {
sourceId: 'SRC003',
destinationId: 'DEST002',
sourceCategory: 'webhook',
},
context: { sources: { job_id: 'job1' } },
},
{
message: {
eventID: 'evt010',
eventName: 'DiscountApplied',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST001',
sourceCategory: 'warehouse',
},
destination: { ID: 'dest1' },
context: { sources: { job_id: 'job1' } },
},
];
const result = groupRouterTransformEvents(events);

const groupedEvents = groupRouterTransformEvents(events);
expect(groupedEvents).toEqual([
[
{
message: {
eventID: 'evt001',
eventName: 'OrderReceived',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST001',
sourceCategory: 'warehouse',
},
},
{
message: {
eventID: 'evt010',
eventName: 'DiscountApplied',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST001',
sourceCategory: 'warehouse',
},
},
],
[
{
message: {
eventID: 'evt002',
eventName: 'InventoryUpdate',
},
metadata: {
sourceId: 'SRC002',
destinationId: 'DEST002',
sourceCategory: 'cloud',
},
},
{
message: {
eventID: 'evt009',
eventName: 'ReviewSubmitted',
},
metadata: {
sourceId: 'SRC003',
destinationId: 'DEST002',
sourceCategory: 'webhook',
},
},
],
[
{
message: {
eventID: 'evt003',
eventName: 'UserLogin',
},
metadata: {
sourceId: 'SRC003',
destinationId: 'DEST003',
sourceCategory: 'webhook',
},
},
{
message: {
eventID: 'evt005',
eventName: 'ShipmentDispatched',
},
metadata: {
sourceId: 'SRC002',
destinationId: 'DEST003',
sourceCategory: 'cloud',
},
},
],
[
{
message: {
eventID: 'evt004',
eventName: 'PaymentProcessed',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST002',
sourceCategory: 'warehouse',
},
},
],
[
{
message: {
eventID: 'evt006',
eventName: 'ProductReturn',
},
metadata: {
sourceId: 'SRC003',
destinationId: 'DEST001',
sourceCategory: 'webhook',
},
},
{
message: {
eventID: 'evt008',
eventName: 'UserRegistration',
},
metadata: {
sourceId: 'SRC002',
destinationId: 'DEST001',
sourceCategory: 'cloud',
},
},
],
[
{
message: {
eventID: 'evt007',
eventName: 'StockAlert',
},
metadata: {
sourceId: 'SRC001',
destinationId: 'DEST003',
sourceCategory: 'warehouse',
},
},
],
expect(result.length).toBe(2); // 2 unique groups
expect(result).toEqual([
[{ context: { sources: { job_id: 'job1' } } }],
[{ destination: { ID: 'dest1' }, context: { sources: { job_id: 'job1' } } }],
]);
});

it('should return an empty array when there are no events', () => {
const events = [];
const result = groupRouterTransformEvents(events);

expect(result).toEqual([]);
});

it('should handle events with completely missing context and destination', () => {
const events = [
{},
{ destination: { ID: 'dest1' } },
{ context: { sources: { job_id: 'job1' } } },
];
const result = groupRouterTransformEvents(events);

expect(result.length).toBe(3); // 3 unique groups
expect(result).toEqual([
[{}],
[{ destination: { ID: 'dest1' } }],
[{ context: { sources: { job_id: 'job1' } } }],
]);
});
});
Expand Down

0 comments on commit 13a7d00

Please sign in to comment.