Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bqstream event ordering fix #2624

Merged
merged 10 commits into from
Sep 20, 2023
74 changes: 44 additions & 30 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
groupEventsByType,
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { getRearrangedEvents } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -50,7 +52,7 @@
};
};

const batchEvents = (eventsChunk) => {
const batchEachUserSuccessEvents = (eventsChunk) => {
const batchedResponseList = [];

// arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...]
Expand All @@ -68,7 +70,7 @@
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
batchResponseList.push(ev.message.properties);
metadata.push(ev.metadata);
metadata.push(ev.metadata[0]);
});

batchEventResponse.batchedRequest = {
Expand Down Expand Up @@ -98,42 +100,54 @@
};

const processRouterDest = (inputs) => {
let orderedEventsList;
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];

inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
const finalResp = [];
let eachTypeBatchedResponse = [];
const batchedEvents = groupEventsByType(inputs);

batchedEvents.forEach((listOfEvents) => {
const eachTypeSuccessEventList = []; // temporary variable to divide payload into chunks
const eachTypeErrorEventsList = [];
listOfEvents.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eachTypeSuccessEventList.push(event);

Check warning on line 119 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L119

Added line #L119 was not covered by tests
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eachTypeSuccessEventList.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
});
}
} catch (error) {
const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION);
eachTypeErrorEventsList.push(eachUserErrorEvent);
}
});
orderedEventsList = getRearrangedEvents(eachTypeSuccessEventList, eachTypeErrorEventsList);
orderedEventsList.forEach((eventList) => {
// no error event list will have more than one items in the list
if (eventList[0].error) {
finalResp.push([...eventList]);
} else {
// batch the successful events
eachTypeBatchedResponse = batchEachUserSuccessEvents(eventList);
finalResp.push([...eachTypeBatchedResponse]);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
}
});
});

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
return [...batchedResponseList, ...errorRespList];
return finalResp.flat();
};

module.exports = { process, processRouterDest };
85 changes: 83 additions & 2 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ const {
getDynamicErrorType,
processAxiosResponse,
} = require('../../../adapters/utils/networkUtils');
const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util');
const {
REFRESH_TOKEN,
AUTH_STATUS_INACTIVE,
} = require('../../../adapters/networkhandler/authConstants');
const { isHttpStatusSuccess } = require('../../util');
const { proxyRequest } = require('../../../adapters/network');
const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes');
const tags = require('../../util/tags');
Expand Down Expand Up @@ -146,4 +146,85 @@ function networkHandler() {
this.processAxiosResponse = processAxiosResponse;
}

module.exports = { networkHandler };
/**
* Optimizes the error response by merging the metadata of the same error type and adding it to the result array.
*
* @param {Object} item - An object representing an error event with properties like `error`, `jobId`, and `metadata`.
* @param {Map} errorMap - A Map object to store the error events and their metadata.
* @param {Array} resultArray - An array to store the optimized error response.
* @returns {void}
*/
const optimizeErrorResponse = (item, errorMap, resultArray) => {
const currentError = item.error;
if (errorMap.has(currentError)) {
// If the error already exists in the map, merge the metadata
const existingErrDetails = errorMap.get(currentError);
existingErrDetails.metadata.push(...item.metadata);
} else {
// Otherwise, add it to the map
errorMap.set(currentError, { ...item });
resultArray.push([errorMap.get(currentError)]);
}
};

const convertMetadataToArray = (eventList) => {
const processedEvents = eventList.map((event) => ({
...event,
metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata],
}));
return processedEvents;
};


/**
* Formats a list of error events into a composite response.
*
* @param {Array} errorEvents - A list of error events, where each event can have an `error` property and a `metadata` array.
* @returns {Array} The formatted composite response, where each element is an array containing the error details.
*/
const formatCompositeResponse = (errorEvents) => {
const resultArray = [];
const errorMap = new Map();

for (const item of errorEvents) {
if (isDefinedAndNotNull(item.error)) {
optimizeErrorResponse(item, errorMap, resultArray);
}
}
return resultArray;
};

/**
* Rearranges the events based on their success or error status.
* If there are no successful events, it groups error events with the same error and their metadata.
* If there are successful events, it returns the batched response of successful events.
*
* @param {Array} successEventList - An array of objects representing successful events.
* Each object should have an `id` and `metadata` property.
* @param {Array} errorEventList - An array of objects representing error events.
* Each object should have an `id`, `metadata`, and `error` property.
* @returns {Array} - An array of rearranged events.
*/
const getRearrangedEvents = (successEventList, errorEventList) => {
// Convert 'metadata' to an array if it's not already
const processedSuccessfulEvents = convertMetadataToArray(successEventList);
const processedErrorEvents = convertMetadataToArray(errorEventList);

// if there are no error events, then return the batched response
if (errorEventList.length === 0) {
return [processedSuccessfulEvents];
}
// if there are no batched response, then return the error events
if (successEventList.length === 0) {
return formatCompositeResponse(processedErrorEvents)
}

// if there are both batched response and error events, then order them
const combinedTransformedEventList = [
[...processedSuccessfulEvents],
...formatCompositeResponse(processedErrorEvents)
]
return combinedTransformedEventList;
};

module.exports = { networkHandler, getRearrangedEvents };
Loading
Loading