Skip to content

Commit

Permalink
fix: bqstream event ordering fix
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Sep 18, 2023
1 parent fd81211 commit 5cf688e
Show file tree
Hide file tree
Showing 4 changed files with 587 additions and 86 deletions.
40 changes: 25 additions & 15 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { getRearrangedEvents } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -50,7 +51,7 @@ const process = (event) => {
};
};

const batchEvents = (eventsChunk) => {
const batchEachUserSuccessEvents = (eventsChunk) => {
const batchedResponseList = [];

// arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...]
Expand All @@ -68,7 +69,7 @@ const batchEvents = (eventsChunk) => {
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
batchResponseList.push(ev.message.properties);
metadata.push(ev.metadata);
metadata.push(ev.metadata[0]);
});

batchEventResponse.batchedRequest = {
Expand Down Expand Up @@ -102,38 +103,47 @@ const processRouterDest = (inputs) => {
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];

const finalResp = [];
const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks
const eachUserErrorEventsList = [];
inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
eachUserSuccessEventslist.push(event);

Check warning on line 113 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L113

Added line #L113 was not covered by tests
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
eachUserSuccessEventslist.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION);
eachUserErrorEventsList.push(eachUserErrorEvent);
}
});

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
return [...batchedResponseList, ...errorRespList];
const orderedEventsList = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList);
// divide the successful payloads into batches
let eachUserBatchedResponse = [];
orderedEventsList.forEach((eventList) => {
// no error event list will have more than one items in the list
if (eventList[0].error) {
finalResp.push([...eventList]);
} else {
// batch the successful events
eachUserBatchedResponse = batchEachUserSuccessEvents(eventList);
finalResp.push([...eachUserBatchedResponse]);
}
});

return finalResp.flat();
};

module.exports = { process, processRouterDest };
115 changes: 113 additions & 2 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
/* eslint-disable no-param-reassign */
const _ = require('lodash');
const getValue = require('get-value');
const {
getDynamicErrorType,
processAxiosResponse,
} = require('../../../adapters/utils/networkUtils');
const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util');
const {
REFRESH_TOKEN,
AUTH_STATUS_INACTIVE,
} = require('../../../adapters/networkhandler/authConstants');
const { isHttpStatusSuccess } = require('../../util');
const { proxyRequest } = require('../../../adapters/network');
const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes');
const tags = require('../../util/tags');
Expand Down Expand Up @@ -146,4 +147,114 @@ function networkHandler() {
this.processAxiosResponse = processAxiosResponse;
}

module.exports = { networkHandler };
/**
* Optimizes the error response by merging the metadata of the same error type and adding it to the result array.
*
* @param {Object} item - An object representing an error event with properties like `error`, `jobId`, and `metadata`.
* @param {Map} errorMap - A Map object to store the error events and their metadata.
* @param {Array} resultArray - An array to store the optimized error response.
* @returns {void}
*/
const optimizeErrorResponse = (item, errorMap, resultArray) => {
const currentError = item.error;
if (errorMap.has(currentError)) {
// If the error already exists in the map, merge the metadata
const existingErrDetails = errorMap.get(currentError);
existingErrDetails.metadata.push(...item.metadata);
} else {
// Otherwise, add it to the map
errorMap.set(currentError, { ...item });
resultArray.push([errorMap.get(currentError)]);
}
};

/**
* Filters and splits an array of events based on whether they have an error or not.
* Returns an array of arrays, where inner arrays represent either a chunk of successful events or
* an array of single error event. It maintains the order of events strictly.
*
* @param {Array} sortedEvents - An array of events to be filtered and split.
* @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event.
*/
const restoreEventOrder = (sortedEvents) => {
let successfulEventsChunk = [];
const resultArray = [];
const errorMap = new Map();
for (const item of sortedEvents) {
// if error is present, then push the previous successfulEventsChunk
// and then push the error event
if (isDefinedAndNotNull(item.error)) {
if (successfulEventsChunk.length > 0) {
resultArray.push(successfulEventsChunk);
successfulEventsChunk = [];
}
optimizeErrorResponse(item, errorMap, resultArray);
} else {
// if error is not present, then push the event to successfulEventsChunk
successfulEventsChunk.push(item);
errorMap.clear();
}
}
// Push the last successfulEventsChunk to resultArray
if (successfulEventsChunk.length > 0) {
resultArray.push(successfulEventsChunk);
}
return resultArray;
};

const convertMetadataToArray = (eventList) => {
const processedEvents = eventList.map((event) => ({
...event,
metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata],
}));
return processedEvents;
};

/**
* Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events.
* If there are no error events, it returns the array of transformed events.
* If there are no successful responses, it returns the error events.
* If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array.
* considering error responses are built with @handleRtTfSingleEventError
*
* @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user.
* @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user.
* @returns {Array} - An ordered array of events.
*
* @example
* const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}];
* const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}];
* Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]]
*/
const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) => {
// Convert 'metadata' to an array if it's not already
const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist);
const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList);

// if there are no error events, then return the batched response
if (eachUserErrorEventsList.length === 0) {
return [processedSuccessfulEvents];
}
// if there are no batched response, then return the error events
if (eachUserSuccessEventslist.length === 0) {
const resultArray = [];
const errorMap = new Map();
processedErrorEvents.forEach((item) => {
optimizeErrorResponse(item, errorMap, resultArray);
});
return resultArray;
}

// if there are both batched response and error events, then order them
const combinedTransformedEventList = [
...processedSuccessfulEvents,
...processedErrorEvents,
].flat();

const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId);
const finalResp = restoreEventOrder(sortedEvents);

return finalResp;
};

module.exports = { networkHandler, getRearrangedEvents };
108 changes: 108 additions & 0 deletions src/v0/destinations/bqstream/util.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
const { getRearrangedEvents } = require('./util');

describe('getRearrangedEvents', () => {
// Tests that the function returns an array of transformed events when there are no error events
it('should return an array of transformed events when there are no error events', () => {
const eachUserSuccessEventslist = [
{ message: { type: 'track' }, metadata: { jobId: 1 } },
{ message: { type: 'track' }, metadata: { jobId: 3 } },
{ message: { type: 'track' }, metadata: { jobId: 5 } },
];
const eachUserErrorEventsList = [];
const expected = [
[
{ message: { type: 'track' }, metadata: [{ jobId: 1 }] },
{ message: { type: 'track' }, metadata: [{ jobId: 3 }] },
{ message: { type: 'track' }, metadata: [{ jobId: 5 }] },
],
];
const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList);
expect(result).toEqual(expected);
});

// Tests that the function returns an empty array when both input arrays are empty
it('should return an empty array when both input arrays are empty', () => {
const eachUserSuccessEventslist = [];
const eachUserErrorEventsList = [];
const expected = [[]];
const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList);
expect(result).toEqual(expected);
});

// Tests that the function returns an array with only error events when all events are erroneous
it('should return an array with only error events when all events are erroneous', () => {
const eachUserSuccessEventslist = [];
const eachUserErrorEventsList = [
{
batched: false,
destination: {},
error: 'Message Type not supported: identify',
metadata: [{ jobId: 3, userId: 'user12345' }],
},
{
batched: false,
destination: {},
error: 'Message Type not supported: identify',
metadata: [{ jobId: 4, userId: 'user12345' }],
},
];
const expected = [
[
{
batched: false,
destination: {},
error: 'Message Type not supported: identify',
metadata: [
{ jobId: 3, userId: 'user12345' },
{ jobId: 4, userId: 'user12345' },
],
},
],
];
const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList);
expect(result).toEqual(expected);
});

// Tests that the function returns an ordered array of events with both successful and erroneous events, ordered based on the jobId property of the events' metadata array
it('should return an ordered array of events with both successful and erroneous events', () => {
const eachUserSuccessEventslist = [
{
batched: false,
destination: {},
error: 'Message Type not supported: identify',
metadata: [{ jobId: 3, userId: 'user12345' }],
},
{
batched: false,
destination: {},
error: 'Message Type not supported: identify',
metadata: [{ jobId: 4, userId: 'user12345' }],
},
];
const eachUserErrorEventsList = [
{ message: { type: 'track' }, metadata: { jobId: 1 } },
{ message: { type: 'track' }, metadata: { jobId: 2 } },
{ message: { type: 'track' }, metadata: { jobId: 5 } },
];
const expected = [
[
{ message: { type: 'track' }, metadata: [{ jobId: 1 }] },
{ message: { type: 'track' }, metadata: [{ jobId: 2 }] },
],
[
{
batched: false,
destination: {},
error: 'Message Type not supported: identify',
metadata: [
{ jobId: 3, userId: 'user12345' },
{ jobId: 4, userId: 'user12345' },
],
},
],
[{ message: { type: 'track' }, metadata: [{ jobId: 5 }] }],
];
const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList);
expect(result).toEqual(expected);
});
});
Loading

0 comments on commit 5cf688e

Please sign in to comment.