Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bqstream event ordering fix #2624

Merged
merged 10 commits into from
Sep 20, 2023
73 changes: 50 additions & 23 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
groupEventsByType,
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { getRearrangedEvents } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -50,7 +52,7 @@
};
};

const batchEvents = (eventsChunk) => {
const batchEachUserSuccessEvents = (eventsChunk) => {
const batchedResponseList = [];

// arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...]
Expand All @@ -68,7 +70,7 @@
chunk.forEach((ev) => {
// Pixel code must be added above "batch": [..]
batchResponseList.push(ev.message.properties);
metadata.push(ev.metadata);
metadata.push(ev.metadata[0]);
});

batchEventResponse.batchedRequest = {
Expand Down Expand Up @@ -97,43 +99,68 @@
return batchedResponseList;
};

const processRouterDest = (inputs) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];

inputs.forEach((event) => {
const processEachTypedEventList = (
typedEventList,
eachTypeSuccessEventList,
eachTypeErrorEventsList,
) => {
typedEventList.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
eachTypeSuccessEventList.push(event);

Check warning on line 111 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L111

Added line #L111 was not covered by tests
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
const response = process(event);
const transformedEvents = Array.isArray(response) ? response : [response];

transformedEvents.forEach((res) => {
eachTypeSuccessEventList.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION);
eachTypeErrorEventsList.push(eachUserErrorEvent);
}
});
};

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
const processRouterDest = (inputs) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;

Check warning on line 135 in src/v0/destinations/bqstream/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/bqstream/transform.js#L135

Added line #L135 was not covered by tests
}
return [...batchedResponseList, ...errorRespList];
const finalResp = [];

const batchedEvents = groupEventsByType(inputs);

batchedEvents.forEach((typedEventList) => {
const eachTypeSuccessEventList = []; // list of events that are transformed successfully
const eachTypeErrorEventsList = []; // list of events that are errored out
processEachTypedEventList(typedEventList, eachTypeSuccessEventList, eachTypeErrorEventsList);

const orderedEventsList = getRearrangedEvents(
eachTypeSuccessEventList,
eachTypeErrorEventsList,
);

orderedEventsList.forEach((eventList) => {
// no error event list will have more than one items in the list
if (eventList[0].error) {
finalResp.push([...eventList]);
} else {
// batch the successful events
const eachTypeBatchedResponse = batchEachUserSuccessEvents(eventList);
finalResp.push([...eachTypeBatchedResponse]);
}
});
});

return finalResp.flat();
};

module.exports = { process, processRouterDest };
192 changes: 63 additions & 129 deletions src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
@@ -1,149 +1,83 @@
/* eslint-disable no-param-reassign */
const getValue = require('get-value');
const {
getDynamicErrorType,
processAxiosResponse,
} = require('../../../adapters/utils/networkUtils');
const {
REFRESH_TOKEN,
AUTH_STATUS_INACTIVE,
} = require('../../../adapters/networkhandler/authConstants');
const { isHttpStatusSuccess } = require('../../util');
const { proxyRequest } = require('../../../adapters/network');
const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes');
const tags = require('../../util/tags');

const DESTINATION_NAME = 'bqstream';

const trimBqStreamResponse = (response) => ({
code: getValue(response, 'response.response.data.error.code'), // data.error.status which contains PERMISSION_DENIED
status: getValue(response, 'response.response.status'),
statusText: getValue(response, 'response.response.statusText'),
headers: getValue(response, 'response.response.headers'),
data: getValue(response, 'response.response.data'), // Incase of errors, this contains error data
success: getValue(response, 'suceess'),
});
const { isDefinedAndNotNull } = require('../../util');
/**
* Obtains the Destination OAuth Error Category based on the error code obtained from destination
* Optimizes the error response by merging the metadata of the same error type and adding it to the result array.
*
* - If an error code is such that the user will not be allowed inside the destination,
* such error codes fall under AUTH_STATUS_INACTIVE
* - If an error code is such that upon refresh we can get a new token which can be used to send event,
* such error codes fall under REFRESH_TOKEN category
* - If an error code doesn't fall under both categories, we can return an empty string
* @param {string} errorCategory - The error code obtained from the destination
* @returns Destination OAuth Error Category
* @param {Object} item - An object representing an error event with properties like `error`, `jobId`, and `metadata`.
* @param {Map} errorMap - A Map object to store the error events and their metadata.
* @param {Array} resultArray - An array to store the optimized error response.
* @returns {void}
*/
const getDestAuthCategory = (errorCategory) => {
switch (errorCategory) {
case 'PERMISSION_DENIED':
return AUTH_STATUS_INACTIVE;
case 'UNAUTHENTICATED':
return REFRESH_TOKEN;
default:
return '';
const optimizeErrorResponse = (item, errorMap, resultArray) => {
const currentError = item.error;
if (errorMap.has(currentError)) {
// If the error already exists in the map, merge the metadata
const existingErrDetails = errorMap.get(currentError);
existingErrDetails.metadata.push(...item.metadata);
} else {
// Otherwise, add it to the map
errorMap.set(currentError, { ...item });
resultArray.push([errorMap.get(currentError)]);
}
};

const destToRudderStatusMap = {
403: {
rateLimitExceeded: 429,
default: 400,
},
400: {
tableUnavailable: 500,
default: 400,
},
500: { default: 500 },
503: { default: 500 },
401: { default: 500 },
404: { default: 400 },
501: { default: 400 },
const convertMetadataToArray = (eventList) => {
const processedEvents = eventList.map((event) => ({
...event,
metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata],
}));
return processedEvents;
};

const getStatusAndCategory = (dresponse, status) => {
const authErrorCategory = getDestAuthCategory(dresponse.error.status);
const reason =
dresponse.error.errors &&
Array.isArray(dresponse.error.errors) &&
dresponse.error.errors.length > 0 &&
dresponse.error.errors[0].reason;
/**
* Formats a list of error events into a composite response.
*
* @param {Array} errorEvents - A list of error events, where each event can have an `error` property and a `metadata` array.
* @returns {Array} The formatted composite response, where each element is an array containing the error details.
*/
const formatCompositeResponse = (errorEvents) => {
const resultArray = [];
const errorMap = new Map();

const trStatus = destToRudderStatusMap[status]
? destToRudderStatusMap[status][reason] || destToRudderStatusMap[status].default
: 500;
return { status: trStatus, authErrorCategory };
for (const item of errorEvents) {
if (isDefinedAndNotNull(item.error)) {
optimizeErrorResponse(item, errorMap, resultArray);
}
}
return resultArray;
};

/**
* This class actually handles the response for BigQuery Stream API
* It can also be used for any Google related API but an API related handling has to be done separately
*
* Here we are only trying to handle OAuth related error(s)
* Any destination specific error handling has to be done in their own way
*
* Reference doc for OAuth Errors
* 1. https://cloud.google.com/apigee/docs/api-platform/reference/policies/oauth-http-status-code-reference
* 2. https://cloud.google.com/bigquery/docs/error-messages
*
* Summary:
* Abortable -> 403, 501, 400
* Retryable -> 5[0-9][02-9], 401(UNAUTHENTICATED)
* "Special Cases":
* status=200, resp.insertErrors.length > 0 === Failure
* 403 => AccessDenied -> AUTH_STATUS_INACTIVE, other 403 => Just abort
* Rearranges the events based on their success or error status.
* If there are no successful events, it groups error events with the same error and their metadata.
* If there are successful events, it returns the batched response of successful events.
*
* @param {Array} successEventList - An array of objects representing successful events.
* Each object should have an `id` and `metadata` property.
* @param {Array} errorEventList - An array of objects representing error events.
* Each object should have an `id`, `metadata`, and `error` property.
* @returns {Array} - An array of rearranged events.
*/
const processResponse = ({ dresponse, status } = {}) => {
const isSuccess =
!dresponse.error &&
isHttpStatusSuccess(status) &&
(!dresponse.insertErrors || (dresponse.insertErrors && dresponse.insertErrors.length === 0));
const getRearrangedEvents = (successEventList, errorEventList) => {
// Convert 'metadata' to an array if it's not already
const processedSuccessfulEvents = convertMetadataToArray(successEventList);
const processedErrorEvents = convertMetadataToArray(errorEventList);

if (!isSuccess) {
if (dresponse.error) {
const { status: trStatus } = getStatusAndCategory(dresponse, status);
throw new NetworkError(
dresponse.error.message || `Request failed for ${DESTINATION_NAME} with status: ${status}`,
trStatus,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(trStatus),
},
dresponse,
);
} else if (dresponse.insertErrors && dresponse.insertErrors.length > 0) {
const temp = trimBqStreamResponse(dresponse);
throw new AbortedError(
'Problem during insert operation',
400,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(temp.status || 400),
},
temp,
getDestAuthCategory(temp.code),
);
}
throw new UnhandledStatusCodeError('Unhandled error type while sending to destination');
// if there are no error events, then return the batched response
if (errorEventList.length === 0) {
return [processedSuccessfulEvents];
}
// if there are no batched response, then return the error events
if (successEventList.length === 0) {
return formatCompositeResponse(processedErrorEvents);
}
};

const responseHandler = (respTransformPayload) => {
const { response, status } = respTransformPayload;
processResponse({
dresponse: response,
status,
});
return {
status,
destinationResponse: response,
message: 'Request Processed Successfully',
};
// if there are both batched response and error events, then order them
const combinedTransformedEventList = [
[...processedSuccessfulEvents],
...formatCompositeResponse(processedErrorEvents),
];
return combinedTransformedEventList;
};

function networkHandler() {
this.responseHandler = responseHandler;
this.proxy = proxyRequest;
this.processAxiosResponse = processAxiosResponse;
}

module.exports = { networkHandler };
module.exports = { getRearrangedEvents };
Loading
Loading