Skip to content

Commit

Permalink
fix: addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shrouti1507 committed Sep 11, 2023
1 parent 09572e3 commit e90f54b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 31 deletions.
54 changes: 26 additions & 28 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const {
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { getGroupedEvents } = require('./util');
const { generateUserJourneys, HandleEventOrdering } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -51,7 +51,7 @@ const process = (event) => {
};
};

const batchEvents = (eventsChunk) => {
const batchEachUserSuccessEvents = (eventsChunk) => {
const batchedResponseList = [];

// arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...]
Expand All @@ -69,7 +69,7 @@ const batchEvents = (eventsChunk) => {
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
batchResponseList.push(ev.message.properties);
metadata.push(ev.metadata);
metadata.push(ev.metadata[0]);
});

batchEventResponse.batchedRequest = {
Expand Down Expand Up @@ -103,49 +103,47 @@ const processRouterDest = (inputs) => {
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
const groupedEvents = getGroupedEvents(inputs);
const userJourneyArrays = generateUserJourneys(inputs);
const finalResp = [];
groupedEvents.forEach((eventList) => {
let eventsChunk = []; // temporary variable to divide payload into chunks
let errorRespList = [];
if (eventList.length > 0) {
eventList.forEach((event) => {
userJourneyArrays.forEach((eachUserJourney) => {
const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks
const eachUserErrorEventsList = [];
if (eachUserJourney.length > 0) {
eachUserJourney.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
eachUserSuccessEventslist.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
eachUserSuccessEventslist.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
// divide the successful payloads till now into batches
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
// clear up the temporary variable
eventsChunk = [];
errorRespList.push(errRespEvent);
finalResp.push([...batchedResponseList, ...errorRespList]);
// putting it back as an empty array
errorRespList = [];
const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION);
eachUserErrorEventsList.push(eachUserErrorEvent);
}
});
const orderedEventsList = HandleEventOrdering(eachUserSuccessEventslist, eachUserErrorEventsList)
// divide the successful payloads into batches
let eachUserBatchedResponse = [];
orderedEventsList.forEach((eventList) => {
// no error event list will have more than one items in the list
if(eventList[0].error) {
finalResp.push([...eventList]);
} else {
// batch the successful events
eachUserBatchedResponse = batchEachUserSuccessEvents(eventList);
finalResp.push([...eachUserBatchedResponse]);
}
});
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
finalResp.push([...batchedResponseList]);
}
}
});
return finalResp.flat();
Expand Down
86 changes: 83 additions & 3 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const {
processAxiosResponse,
} = require('../../../adapters/utils/networkUtils');
const { DISABLE_DEST, REFRESH_TOKEN } = require('../../../adapters/networkhandler/authConstants');
const { isHttpStatusSuccess } = require('../../util');
const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util');
const { proxyRequest } = require('../../../adapters/network');
const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes');
const tags = require('../../util/tags');
Expand Down Expand Up @@ -151,10 +151,90 @@ function networkHandler() {
* @returns {Array} An array of arrays containing the grouped events.
* Each inner array represents a user journey.
*/
const getGroupedEvents = (inputs) => {
const generateUserJourneys = (inputs) => {
const userIdEventMap = _.groupBy(inputs, 'metadata.userId');
const eventGroupedByUserId = Object.values(userIdEventMap);
return eventGroupedByUserId;
};

module.exports = { networkHandler, getGroupedEvents };
/**
* Filters and splits an array of events based on whether they have an error or not.
* Returns an array of arrays, where inner arrays represent either a chunk of successful events or
* an array of single error event. It maintains the order of events strictly.
*
* @param {Array} sortedEvents - An array of events to be filtered and split.
* @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event.
*/
const filterAndSplitEvents = (sortedEvents) => {
let successfulEventsChunk = [];
const resultArray = []
for (const item of sortedEvents) {
// if error is present, then push the previous successfulEventsChunk
// and then push the error event
if (isDefinedAndNotNull(item.error)) {
if(successfulEventsChunk.length > 0) {
resultArray.push(successfulEventsChunk);
successfulEventsChunk = [];
}
resultArray.push([item]);
} else {
// if error is not present, then push the event to successfulEventsChunk
successfulEventsChunk.push(item);
}
}
// Push the last successfulEventsChunk to resultArray
if (successfulEventsChunk.length > 0) {
resultArray.push(successfulEventsChunk);
}
return resultArray;
};


const convertMetadataToArray = (eventList ) => {
const processedEvents = eventList.map((event) => ({
...event,
metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata],
}));
return processedEvents;
}

/**
* Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events.
* If there are no error events, it returns the array of transformed events.
* If there are no successful responses, it returns the error events.
* If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array.
* considering error responses are built with @handleRtTfSingleEventError
*
* @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user.
* @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user.
* @returns {Array} - An ordered array of events.
*
* @example
* const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}];
* const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}];
* Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]]
*/
const HandleEventOrdering = (eachUserSuccessEventslist, eachUserErrorEventsList) => {
// Convert 'metadata' to an array if it's not already
const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist);
const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList);

// if there are no error events, then return the batched response
if (eachUserErrorEventsList.length === 0) {
return [processedSuccessfulEvents];
}
// if there are no batched response, then return the error events
if (eachUserSuccessEventslist.length === 0) {
return [processedErrorEvents];
}

// if there are both batched response and error events, then order them
const combinedTransformedEventList = [...processedSuccessfulEvents, ...processedErrorEvents].flat();

const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId);
const finalResp = filterAndSplitEvents(sortedEvents);

return finalResp;
}

module.exports = { networkHandler, generateUserJourneys, HandleEventOrdering };

0 comments on commit e90f54b

Please sign in to comment.