Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bqstream): event ordering #2558

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 45 additions & 32 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { generateUserJourneys, HandleEventOrdering } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -50,7 +51,7 @@ const process = (event) => {
};
};

const batchEvents = (eventsChunk) => {
const batchEachUserSuccessEvents = (eventsChunk) => {
const batchedResponseList = [];

// arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...]
Expand All @@ -68,7 +69,7 @@ const batchEvents = (eventsChunk) => {
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
batchResponseList.push(ev.message.properties);
metadata.push(ev.metadata);
metadata.push(ev.metadata[0]);
});

batchEventResponse.batchedRequest = {
Expand Down Expand Up @@ -102,38 +103,50 @@ const processRouterDest = (inputs) => {
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];

inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
const userJourneyArrays = generateUserJourneys(inputs);
const finalResp = [];
userJourneyArrays.forEach((eachUserJourney) => {
const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks
const eachUserErrorEventsList = [];
if (eachUserJourney.length > 0) {
eachUserJourney.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eachUserSuccessEventslist.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eachUserSuccessEventslist.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION);
eachUserErrorEventsList.push(eachUserErrorEvent);
}
});
const orderedEventsList = HandleEventOrdering(eachUserSuccessEventslist, eachUserErrorEventsList)
// divide the successful payloads into batches
let eachUserBatchedResponse = [];
orderedEventsList.forEach((eventList) => {
// no error event list will have more than one items in the list
if(eventList[0].error) {
finalResp.push([...eventList]);
} else {
// batch the successful events
eachUserBatchedResponse = batchEachUserSuccessEvents(eventList);
finalResp.push([...eachUserBatchedResponse]);
}
});
}
});

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
return [...batchedResponseList, ...errorRespList];
return finalResp.flat();
};

module.exports = { process, processRouterDest };
98 changes: 96 additions & 2 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
/* eslint-disable no-param-reassign */
const _ = require('lodash');
const getValue = require('get-value');
const {
getDynamicErrorType,
processAxiosResponse,
} = require('../../../adapters/utils/networkUtils');
const { DISABLE_DEST, REFRESH_TOKEN } = require('../../../adapters/networkhandler/authConstants');
const { isHttpStatusSuccess } = require('../../util');
const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util');
const { proxyRequest } = require('../../../adapters/network');
const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes');
const tags = require('../../util/tags');
Expand Down Expand Up @@ -143,4 +144,97 @@ function networkHandler() {
this.processAxiosResponse = processAxiosResponse;
}

module.exports = { networkHandler };
/**
* Groups the input events based on the `userId` property
*
* @param {Array} inputs - An array of objects representing events with `metadata.userId` property.
* @returns {Array} An array of arrays containing the grouped events.
* Each inner array represents a user journey.
*/
const generateUserJourneys = (inputs) => {
const userIdEventMap = _.groupBy(inputs, 'metadata.userId');
const eventGroupedByUserId = Object.values(userIdEventMap);
return eventGroupedByUserId;
};

/**
* Filters and splits an array of events based on whether they have an error or not.
* Returns an array of arrays, where inner arrays represent either a chunk of successful events or
* an array of single error event. It maintains the order of events strictly.
*
* @param {Array} sortedEvents - An array of events to be filtered and split.
* @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event.
*/
const filterAndSplitEvents = (sortedEvents) => {
let successfulEventsChunk = [];
const resultArray = []
for (const item of sortedEvents) {
// if error is present, then push the previous successfulEventsChunk
// and then push the error event
if (isDefinedAndNotNull(item.error)) {
if(successfulEventsChunk.length > 0) {
resultArray.push(successfulEventsChunk);
successfulEventsChunk = [];
}
resultArray.push([item]);
} else {
// if error is not present, then push the event to successfulEventsChunk
successfulEventsChunk.push(item);
}
}
// Push the last successfulEventsChunk to resultArray
if (successfulEventsChunk.length > 0) {
resultArray.push(successfulEventsChunk);
}
return resultArray;
};


const convertMetadataToArray = (eventList ) => {
const processedEvents = eventList.map((event) => ({
...event,
metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata],
}));
return processedEvents;
}

/**
* Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events.
* If there are no error events, it returns the array of transformed events.
* If there are no successful responses, it returns the error events.
* If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array.
* considering error responses are built with @handleRtTfSingleEventError
*
* @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user.
* @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user.
* @returns {Array} - An ordered array of events.
*
* @example
* const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}];
* const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}];
* Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]]
*/
const HandleEventOrdering = (eachUserSuccessEventslist, eachUserErrorEventsList) => {
// Convert 'metadata' to an array if it's not already
const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist);
const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList);

// if there are no error events, then return the batched response
if (eachUserErrorEventsList.length === 0) {
return [processedSuccessfulEvents];
}
// if there are no batched response, then return the error events
if (eachUserSuccessEventslist.length === 0) {
return [processedErrorEvents];
}

// if there are both batched response and error events, then order them
const combinedTransformedEventList = [...processedSuccessfulEvents, ...processedErrorEvents].flat();

const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId);
const finalResp = filterAndSplitEvents(sortedEvents);

return finalResp;
}

module.exports = { networkHandler, generateUserJourneys, HandleEventOrdering };
Loading