From 5cf688e5de81c0b3232c59d693f537f554c66cb3 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Mon, 18 Sep 2023 12:14:17 +0530 Subject: [PATCH 1/9] fix: bqstream event ordering fix --- src/v0/destinations/bqstream/transform.js | 40 +- src/v0/destinations/bqstream/util.js | 115 ++++- src/v0/destinations/bqstream/util.test.js | 108 +++++ .../destinations/bqstream/router/data.ts | 410 +++++++++++++++--- 4 files changed, 587 insertions(+), 86 deletions(-) create mode 100644 src/v0/destinations/bqstream/util.test.js diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 4db1856535..49fd35503f 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -9,6 +9,7 @@ const { } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); +const { getRearrangedEvents } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -50,7 +51,7 @@ const process = (event) => { }; }; -const batchEvents = (eventsChunk) => { +const batchEachUserSuccessEvents = (eventsChunk) => { const batchedResponseList = []; // arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...] @@ -68,7 +69,7 @@ const batchEvents = (eventsChunk) => { chunk.forEach((ev) => { // Pixel code must be added above "batch": [..] batchResponseList.push(ev.message.properties); - metadata.push(ev.metadata); + metadata.push(ev.metadata[0]); }); batchEventResponse.batchedRequest = { @@ -102,21 +103,20 @@ const processRouterDest = (inputs) => { if (errorRespEvents.length > 0) { return errorRespEvents; } - - const eventsChunk = []; // temporary variable to divide payload into chunks - const errorRespList = []; - + const finalResp = []; + const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks + const eachUserErrorEventsList = []; inputs.forEach((event) => { try { if (event.message.statusCode) { // already transformed event - eventsChunk.push(event); + eachUserSuccessEventslist.push(event); } else { // if not transformed let response = process(event); response = Array.isArray(response) ? response : [response]; response.forEach((res) => { - eventsChunk.push({ + eachUserSuccessEventslist.push({ message: res, metadata: event.metadata, destination: event.destination, @@ -124,16 +124,26 @@ const processRouterDest = (inputs) => { }); } } catch (error) { - const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION); - errorRespList.push(errRespEvent); + const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); + eachUserErrorEventsList.push(eachUserErrorEvent); } }); - let batchedResponseList = []; - if (eventsChunk.length > 0) { - batchedResponseList = batchEvents(eventsChunk); - } - return [...batchedResponseList, ...errorRespList]; + const orderedEventsList = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + // divide the successful payloads into batches + let eachUserBatchedResponse = []; + orderedEventsList.forEach((eventList) => { + // no error event list will have more than one items in the list + if (eventList[0].error) { + finalResp.push([...eventList]); + } else { + // batch the successful events + eachUserBatchedResponse = batchEachUserSuccessEvents(eventList); + finalResp.push([...eachUserBatchedResponse]); + } + }); + + return finalResp.flat(); }; module.exports = { process, processRouterDest }; diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 2448d72d76..6670623bbc 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,14 +1,15 @@ /* eslint-disable no-param-reassign */ +const _ = require('lodash'); const getValue = require('get-value'); const { getDynamicErrorType, processAxiosResponse, } = require('../../../adapters/utils/networkUtils'); +const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util'); const { REFRESH_TOKEN, AUTH_STATUS_INACTIVE, } = require('../../../adapters/networkhandler/authConstants'); -const { isHttpStatusSuccess } = require('../../util'); const { proxyRequest } = require('../../../adapters/network'); const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes'); const tags = require('../../util/tags'); @@ -146,4 +147,114 @@ function networkHandler() { this.processAxiosResponse = processAxiosResponse; } -module.exports = { networkHandler }; +/** + * Optimizes the error response by merging the metadata of the same error type and adding it to the result array. + * + * @param {Object} item - An object representing an error event with properties like `error`, `jobId`, and `metadata`. + * @param {Map} errorMap - A Map object to store the error events and their metadata. + * @param {Array} resultArray - An array to store the optimized error response. + * @returns {void} + */ +const optimizeErrorResponse = (item, errorMap, resultArray) => { + const currentError = item.error; + if (errorMap.has(currentError)) { + // If the error already exists in the map, merge the metadata + const existingErrDetails = errorMap.get(currentError); + existingErrDetails.metadata.push(...item.metadata); + } else { + // Otherwise, add it to the map + errorMap.set(currentError, { ...item }); + resultArray.push([errorMap.get(currentError)]); + } +}; + +/** + * Filters and splits an array of events based on whether they have an error or not. + * Returns an array of arrays, where inner arrays represent either a chunk of successful events or + * an array of single error event. It maintains the order of events strictly. + * + * @param {Array} sortedEvents - An array of events to be filtered and split. + * @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event. + */ +const restoreEventOrder = (sortedEvents) => { + let successfulEventsChunk = []; + const resultArray = []; + const errorMap = new Map(); + for (const item of sortedEvents) { + // if error is present, then push the previous successfulEventsChunk + // and then push the error event + if (isDefinedAndNotNull(item.error)) { + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + successfulEventsChunk = []; + } + optimizeErrorResponse(item, errorMap, resultArray); + } else { + // if error is not present, then push the event to successfulEventsChunk + successfulEventsChunk.push(item); + errorMap.clear(); + } + } + // Push the last successfulEventsChunk to resultArray + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + } + return resultArray; +}; + +const convertMetadataToArray = (eventList) => { + const processedEvents = eventList.map((event) => ({ + ...event, + metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata], + })); + return processedEvents; +}; + +/** + * Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events. + * If there are no error events, it returns the array of transformed events. + * If there are no successful responses, it returns the error events. + * If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array. + * considering error responses are built with @handleRtTfSingleEventError + * + * @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user. + * @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user. + * @returns {Array} - An ordered array of events. + * + * @example + * const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}]; + * const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}]; + * Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]] + */ +const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) => { + // Convert 'metadata' to an array if it's not already + const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist); + const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList); + + // if there are no error events, then return the batched response + if (eachUserErrorEventsList.length === 0) { + return [processedSuccessfulEvents]; + } + // if there are no batched response, then return the error events + if (eachUserSuccessEventslist.length === 0) { + const resultArray = []; + const errorMap = new Map(); + processedErrorEvents.forEach((item) => { + optimizeErrorResponse(item, errorMap, resultArray); + }); + return resultArray; + } + + // if there are both batched response and error events, then order them + const combinedTransformedEventList = [ + ...processedSuccessfulEvents, + ...processedErrorEvents, + ].flat(); + + const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId); + const finalResp = restoreEventOrder(sortedEvents); + + return finalResp; +}; + +module.exports = { networkHandler, getRearrangedEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js new file mode 100644 index 0000000000..3f9b6d9ff3 --- /dev/null +++ b/src/v0/destinations/bqstream/util.test.js @@ -0,0 +1,108 @@ +const { getRearrangedEvents } = require('./util'); + +describe('getRearrangedEvents', () => { + // Tests that the function returns an array of transformed events when there are no error events + it('should return an array of transformed events when there are no error events', () => { + const eachUserSuccessEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const eachUserErrorEventsList = []; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 3 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 5 }] }, + ], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an empty array when both input arrays are empty + it('should return an empty array when both input arrays are empty', () => { + const eachUserSuccessEventslist = []; + const eachUserErrorEventsList = []; + const expected = [[]]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an array with only error events when all events are erroneous + it('should return an array with only error events when all events are erroneous', () => { + const eachUserSuccessEventslist = []; + const eachUserErrorEventsList = [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const expected = [ + [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [ + { jobId: 3, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an ordered array of events with both successful and erroneous events, ordered based on the jobId property of the events' metadata array + it('should return an ordered array of events with both successful and erroneous events', () => { + const eachUserSuccessEventslist = [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const eachUserErrorEventsList = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 2 }] }, + ], + [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [ + { jobId: 3, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + [{ message: { type: 'track' }, metadata: [{ jobId: 5 }] }], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); +}); diff --git a/test/integrations/destinations/bqstream/router/data.ts b/test/integrations/destinations/bqstream/router/data.ts index 0f6e663ad0..d27fb928b7 100644 --- a/test/integrations/destinations/bqstream/router/data.ts +++ b/test/integrations/destinations/bqstream/router/data.ts @@ -13,66 +13,141 @@ export const data = [ message: { type: 'track', event: 'insert product', - sentAt: '2021-09-08T11:10:45.466Z', userId: 'user12345', - channel: 'web', - context: { - os: { - name: '', - version: '', - }, - app: { - name: 'RudderLabs JavaScript SDK', - build: '1.0.0', - version: '1.1.18', - namespace: 'com.rudderlabs.javascript', - }, - page: { - url: 'http://127.0.0.1:5500/index.html', - path: '/index.html', - title: 'Document', - search: '', - tab_url: 'http://127.0.0.1:5500/index.html', - referrer: '$direct', - initial_referrer: '$direct', - referring_domain: '', - initial_referring_domain: '', - }, - locale: 'en-GB', - screen: { - width: 1536, - height: 960, - density: 2, - innerWidth: 1536, - innerHeight: 776, - }, - traits: {}, - library: { - name: 'RudderLabs JavaScript SDK', - version: '1.1.18', - }, - campaign: {}, - userAgent: - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', - }, - rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380', - messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95', - timestamp: '2021-11-15T14:06:42.497+05:30', + properties: { count: 10, productId: 10, productName: 'Product-10', }, - receivedAt: '2021-11-15T14:06:42.497+05:30', - request_ip: '[::1]', anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', - integrations: { - All: true, - }, - originalTimestamp: '2021-09-08T11:10:45.466Z', }, metadata: { jobId: 1, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + }, + metadata: { + jobId: 2, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'identify', + event: 'insert product', + userId: 'user12345', + traits: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 3, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 5, + userId: 'user123', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 6, + userId: 'user124', }, destination: { Config: { @@ -84,7 +159,7 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, @@ -93,19 +168,70 @@ export const data = [ message: { type: 'track', event: 'insert product', - sentAt: '2021-09-08T11:10:45.466Z', + userId: 'user12345', - channel: 'web', + }, + metadata: { + jobId: 7, + userId: 'user124', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + }, + metadata: { + jobId: 8, + userId: 'user125', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'identify', + event: 'insert product', + + userId: 'user12345', + context: { os: { - name: '', + Name: '', version: '', }, app: { - name: 'RudderLabs JavaScript SDK', + Name: 'RudderLabs JavaScript SDK', build: '1.0.0', version: '1.1.18', - namespace: 'com.rudderlabs.javascript', + Namespace: 'com.rudderlabs.javascript', }, page: { url: 'http://127.0.0.1:5500/index.html', @@ -128,31 +254,25 @@ export const data = [ }, traits: {}, library: { - name: 'RudderLabs JavaScript SDK', + Name: 'RudderLabs JavaScript SDK', version: '1.1.18', }, campaign: {}, userAgent: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', }, - rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380', - messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95', - timestamp: '2021-11-15T14:06:42.497+05:30', - properties: { + + traits: { count: 20, productId: 20, productName: 'Product-20', }, receivedAt: '2021-11-15T14:06:42.497+05:30', - request_ip: '[::1]', anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', - integrations: { - All: true, - }, - originalTimestamp: '2021-09-08T11:10:45.466Z', }, metadata: { - jobId: 2, + jobId: 9, + userId: 'user125', }, destination: { Config: { @@ -164,7 +284,7 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, @@ -203,9 +323,11 @@ export const data = [ metadata: [ { jobId: 1, + userId: 'user12345', }, { jobId: 2, + userId: 'user12345', }, ], batched: true, @@ -220,10 +342,160 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + error: 'Message Type not supported: identify', + metadata: [ + { + jobId: 3, + userId: 'user12345', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, + }, + { + batched: true, + batchedRequest: { + datasetId: 'gc_dataset', + projectId: 'gc-project-id', + properties: [ + { + count: 20, + insertId: '20', + productId: 20, + productName: 'Product-20', + }, + { + count: 20, + insertId: '20', + productId: 20, + productName: 'Product-20', + }, + ], + tableId: 'gc_table', + }, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + metadata: [ + { + jobId: 5, + userId: 'user123', + }, + { + jobId: 6, + userId: 'user124', + }, + ], + statusCode: 200, + }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + error: 'Invalid payload for the destination', + metadata: [ + { + jobId: 7, + userId: 'user124', + }, + { + jobId: 8, + userId: 'user125', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, + }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, + error: 'Message Type not supported: identify', + metadata: [ + { + jobId: 9, + userId: 'user125', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, }, ], }, From 2438da6ec79aca96fd67bce27499994a9d5d666e Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Mon, 18 Sep 2023 21:13:45 +0530 Subject: [PATCH 2/9] fix: review comments addressed --- src/v0/destinations/bqstream/transform.js | 75 ++++++----- src/v0/destinations/bqstream/util.js | 100 ++++++-------- src/v0/destinations/bqstream/util.test.js | 45 +------ .../destinations/bqstream/router/data.ts | 123 ++---------------- 4 files changed, 91 insertions(+), 252 deletions(-) diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 49fd35503f..220132ab06 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -9,7 +9,7 @@ const { } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); -const { getRearrangedEvents } = require('./util'); +const { getRearrangedEvents, batchEvents } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -99,48 +99,51 @@ const batchEachUserSuccessEvents = (eventsChunk) => { }; const processRouterDest = (inputs) => { + let orderedEventsList; const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION); if (errorRespEvents.length > 0) { return errorRespEvents; } const finalResp = []; - const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks - const eachUserErrorEventsList = []; - inputs.forEach((event) => { - try { - if (event.message.statusCode) { - // already transformed event - eachUserSuccessEventslist.push(event); - } else { - // if not transformed - let response = process(event); - response = Array.isArray(response) ? response : [response]; - response.forEach((res) => { - eachUserSuccessEventslist.push({ - message: res, - metadata: event.metadata, - destination: event.destination, + let eachTypeBatchedResponse = []; + const batchedEvents = batchEvents(inputs); + + batchedEvents.forEach((listOfEvents) => { + const eachTypeSuccessEventList = []; // temporary variable to divide payload into chunks + const eachTypeErrorEventsList = []; + listOfEvents.forEach((event) => { + try { + if (event.message.statusCode) { + // already transformed event + eachTypeSuccessEventList.push(event); + } else { + // if not transformed + let response = process(event); + response = Array.isArray(response) ? response : [response]; + response.forEach((res) => { + eachTypeSuccessEventList.push({ + message: res, + metadata: event.metadata, + destination: event.destination, + }); }); - }); + } + } catch (error) { + const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); + eachTypeErrorEventsList.push(eachUserErrorEvent); } - } catch (error) { - const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); - eachUserErrorEventsList.push(eachUserErrorEvent); - } - }); - - const orderedEventsList = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); - // divide the successful payloads into batches - let eachUserBatchedResponse = []; - orderedEventsList.forEach((eventList) => { - // no error event list will have more than one items in the list - if (eventList[0].error) { - finalResp.push([...eventList]); - } else { - // batch the successful events - eachUserBatchedResponse = batchEachUserSuccessEvents(eventList); - finalResp.push([...eachUserBatchedResponse]); - } + }); + orderedEventsList = getRearrangedEvents(eachTypeSuccessEventList, eachTypeErrorEventsList); + orderedEventsList.forEach((eventList) => { + // no error event list will have more than one items in the list + if (eventList[0].error) { + finalResp.push([...eventList]); + } else { + // batch the successful events + eachTypeBatchedResponse = batchEachUserSuccessEvents(eventList); + finalResp.push([...eachTypeBatchedResponse]); + } + }); }); return finalResp.flat(); diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 6670623bbc..832580fd91 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -147,6 +147,32 @@ function networkHandler() { this.processAxiosResponse = processAxiosResponse; } +const batchEvents = (inputs) => { + const batches = []; + let currentInputsArray = inputs; + while (currentInputsArray.length > 0) { + const remainingInputsArray = []; + const userOrderTracker = {}; + const event = currentInputsArray.shift(); + const messageType = event.message.type; + const batch = [event]; + currentInputsArray.forEach((currentInput) => { + const currentMessageType = currentInput.message.type; + const currentUser = currentInput.metadata.userId; + if (currentMessageType === messageType && !userOrderTracker[currentUser]) { + batch.push(currentInput); + } else { + remainingInputsArray.push(currentInput); + userOrderTracker[currentUser] = true; + } + }); + batches.push(batch); + currentInputsArray = remainingInputsArray; + } + + return batches; +}; + /** * Optimizes the error response by merging the metadata of the same error type and adding it to the result array. * @@ -168,40 +194,6 @@ const optimizeErrorResponse = (item, errorMap, resultArray) => { } }; -/** - * Filters and splits an array of events based on whether they have an error or not. - * Returns an array of arrays, where inner arrays represent either a chunk of successful events or - * an array of single error event. It maintains the order of events strictly. - * - * @param {Array} sortedEvents - An array of events to be filtered and split. - * @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event. - */ -const restoreEventOrder = (sortedEvents) => { - let successfulEventsChunk = []; - const resultArray = []; - const errorMap = new Map(); - for (const item of sortedEvents) { - // if error is present, then push the previous successfulEventsChunk - // and then push the error event - if (isDefinedAndNotNull(item.error)) { - if (successfulEventsChunk.length > 0) { - resultArray.push(successfulEventsChunk); - successfulEventsChunk = []; - } - optimizeErrorResponse(item, errorMap, resultArray); - } else { - // if error is not present, then push the event to successfulEventsChunk - successfulEventsChunk.push(item); - errorMap.clear(); - } - } - // Push the last successfulEventsChunk to resultArray - if (successfulEventsChunk.length > 0) { - resultArray.push(successfulEventsChunk); - } - return resultArray; -}; - const convertMetadataToArray = (eventList) => { const processedEvents = eventList.map((event) => ({ ...event, @@ -211,30 +203,21 @@ const convertMetadataToArray = (eventList) => { }; /** - * Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events. - * If there are no error events, it returns the array of transformed events. - * If there are no successful responses, it returns the error events. - * If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array. - * considering error responses are built with @handleRtTfSingleEventError - * - * @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user. - * @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user. - * @returns {Array} - An ordered array of events. + * Rearranges the events based on their success or error status. + * If there are no successful events, it groups error events with the same error and their metadata. + * If there are successful events, it returns the batched response of successful events. * - * @example - * const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}]; - * const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}]; - * Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]] + * @param {Array} eachUserSuccessEventslist - An array of objects representing successful events. + * Each object should have an `id` and `metadata` property. + * @param {Array} eachUserErrorEventsList - An array of objects representing error events. + * Each object should have an `id`, `metadata`, and `error` property. + * @returns {Array} - An array of rearranged events. */ const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) => { // Convert 'metadata' to an array if it's not already const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist); const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList); - // if there are no error events, then return the batched response - if (eachUserErrorEventsList.length === 0) { - return [processedSuccessfulEvents]; - } // if there are no batched response, then return the error events if (eachUserSuccessEventslist.length === 0) { const resultArray = []; @@ -244,17 +227,8 @@ const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) }); return resultArray; } - - // if there are both batched response and error events, then order them - const combinedTransformedEventList = [ - ...processedSuccessfulEvents, - ...processedErrorEvents, - ].flat(); - - const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId); - const finalResp = restoreEventOrder(sortedEvents); - - return finalResp; + // if there are no error events, then return the batched response + return [processedSuccessfulEvents]; }; -module.exports = { networkHandler, getRearrangedEvents }; +module.exports = { networkHandler, getRearrangedEvents, batchEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js index 3f9b6d9ff3..060b77ed7d 100644 --- a/src/v0/destinations/bqstream/util.test.js +++ b/src/v0/destinations/bqstream/util.test.js @@ -24,7 +24,7 @@ describe('getRearrangedEvents', () => { it('should return an empty array when both input arrays are empty', () => { const eachUserSuccessEventslist = []; const eachUserErrorEventsList = []; - const expected = [[]]; + const expected = []; const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); expect(result).toEqual(expected); }); @@ -62,47 +62,4 @@ describe('getRearrangedEvents', () => { const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); expect(result).toEqual(expected); }); - - // Tests that the function returns an ordered array of events with both successful and erroneous events, ordered based on the jobId property of the events' metadata array - it('should return an ordered array of events with both successful and erroneous events', () => { - const eachUserSuccessEventslist = [ - { - batched: false, - destination: {}, - error: 'Message Type not supported: identify', - metadata: [{ jobId: 3, userId: 'user12345' }], - }, - { - batched: false, - destination: {}, - error: 'Message Type not supported: identify', - metadata: [{ jobId: 4, userId: 'user12345' }], - }, - ]; - const eachUserErrorEventsList = [ - { message: { type: 'track' }, metadata: { jobId: 1 } }, - { message: { type: 'track' }, metadata: { jobId: 2 } }, - { message: { type: 'track' }, metadata: { jobId: 5 } }, - ]; - const expected = [ - [ - { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, - { message: { type: 'track' }, metadata: [{ jobId: 2 }] }, - ], - [ - { - batched: false, - destination: {}, - error: 'Message Type not supported: identify', - metadata: [ - { jobId: 3, userId: 'user12345' }, - { jobId: 4, userId: 'user12345' }, - ], - }, - ], - [{ message: { type: 'track' }, metadata: [{ jobId: 5 }] }], - ]; - const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); - expect(result).toEqual(expected); - }); }); diff --git a/test/integrations/destinations/bqstream/router/data.ts b/test/integrations/destinations/bqstream/router/data.ts index d27fb928b7..c04c12603a 100644 --- a/test/integrations/destinations/bqstream/router/data.ts +++ b/test/integrations/destinations/bqstream/router/data.ts @@ -301,91 +301,23 @@ export const data = [ body: { output: [ { + batched: true, batchedRequest: { datasetId: 'gc_dataset', projectId: 'gc-project-id', properties: [ { count: 10, + insertId: '10', productId: 10, productName: 'Product-10', - insertId: '10', }, { count: 20, + insertId: '20', productId: 20, productName: 'Product-20', - insertId: '20', }, - ], - tableId: 'gc_table', - }, - metadata: [ - { - jobId: 1, - userId: 'user12345', - }, - { - jobId: 2, - userId: 'user12345', - }, - ], - batched: true, - statusCode: 200, - destination: { - Config: { - rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', - projectId: 'gc-project-id', - datasetId: 'gc_dataset', - tableId: 'gc_table', - insertId: 'productId', - eventDelivery: true, - eventDeliveryTS: 1636965406397, - }, - - ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', - Name: 'bqstream test', - }, - }, - { - batched: false, - destination: { - Config: { - datasetId: 'gc_dataset', - eventDelivery: true, - eventDeliveryTS: 1636965406397, - insertId: 'productId', - projectId: 'gc-project-id', - rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', - tableId: 'gc_table', - }, - - ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', - Name: 'bqstream test', - }, - error: 'Message Type not supported: identify', - metadata: [ - { - jobId: 3, - userId: 'user12345', - }, - ], - statTags: { - destType: 'BQSTREAM', - errorCategory: 'dataValidation', - errorType: 'instrumentation', - feature: 'router', - implementation: 'native', - module: 'destination', - }, - statusCode: 400, - }, - { - batched: true, - batchedRequest: { - datasetId: 'gc_dataset', - projectId: 'gc-project-id', - properties: [ { count: 20, insertId: '20', @@ -411,11 +343,18 @@ export const data = [ rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', tableId: 'gc_table', }, - ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, metadata: [ + { + jobId: 1, + userId: 'user12345', + }, + { + jobId: 2, + userId: 'user12345', + }, { jobId: 5, userId: 'user123', @@ -439,49 +378,15 @@ export const data = [ rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', tableId: 'gc_table', }, - ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, - error: 'Invalid payload for the destination', + error: 'Message Type not supported: identify', metadata: [ { - jobId: 7, - userId: 'user124', - }, - { - jobId: 8, - userId: 'user125', - }, - ], - statTags: { - destType: 'BQSTREAM', - errorCategory: 'dataValidation', - errorType: 'instrumentation', - feature: 'router', - implementation: 'native', - module: 'destination', - }, - statusCode: 400, - }, - { - batched: false, - destination: { - Config: { - datasetId: 'gc_dataset', - eventDelivery: true, - eventDeliveryTS: 1636965406397, - insertId: 'productId', - projectId: 'gc-project-id', - rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', - tableId: 'gc_table', + jobId: 3, + userId: 'user12345', }, - - ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', - Name: 'bqstream test', - }, - error: 'Message Type not supported: identify', - metadata: [ { jobId: 9, userId: 'user125', From ebd0823c805d3561f178fa7647a1c7975d8b8364 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Mon, 18 Sep 2023 21:30:41 +0530 Subject: [PATCH 3/9] fix: importing the common util function --- src/v0/destinations/bqstream/transform.js | 5 ++-- src/v0/destinations/bqstream/util.js | 31 ++--------------------- 2 files changed, 5 insertions(+), 31 deletions(-) diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 220132ab06..c98a882a20 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -6,10 +6,11 @@ const { getSuccessRespEvents, checkInvalidRtTfEvents, handleRtTfSingleEventError, + groupEventsByType, } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); -const { getRearrangedEvents, batchEvents } = require('./util'); +const { getRearrangedEvents } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -106,7 +107,7 @@ const processRouterDest = (inputs) => { } const finalResp = []; let eachTypeBatchedResponse = []; - const batchedEvents = batchEvents(inputs); + const batchedEvents = groupEventsByType(inputs); batchedEvents.forEach((listOfEvents) => { const eachTypeSuccessEventList = []; // temporary variable to divide payload into chunks diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 832580fd91..542f4421d7 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,11 +1,10 @@ /* eslint-disable no-param-reassign */ -const _ = require('lodash'); const getValue = require('get-value'); const { getDynamicErrorType, processAxiosResponse, } = require('../../../adapters/utils/networkUtils'); -const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util'); +const { isHttpStatusSuccess } = require('../../util'); const { REFRESH_TOKEN, AUTH_STATUS_INACTIVE, @@ -147,32 +146,6 @@ function networkHandler() { this.processAxiosResponse = processAxiosResponse; } -const batchEvents = (inputs) => { - const batches = []; - let currentInputsArray = inputs; - while (currentInputsArray.length > 0) { - const remainingInputsArray = []; - const userOrderTracker = {}; - const event = currentInputsArray.shift(); - const messageType = event.message.type; - const batch = [event]; - currentInputsArray.forEach((currentInput) => { - const currentMessageType = currentInput.message.type; - const currentUser = currentInput.metadata.userId; - if (currentMessageType === messageType && !userOrderTracker[currentUser]) { - batch.push(currentInput); - } else { - remainingInputsArray.push(currentInput); - userOrderTracker[currentUser] = true; - } - }); - batches.push(batch); - currentInputsArray = remainingInputsArray; - } - - return batches; -}; - /** * Optimizes the error response by merging the metadata of the same error type and adding it to the result array. * @@ -231,4 +204,4 @@ const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) return [processedSuccessfulEvents]; }; -module.exports = { networkHandler, getRearrangedEvents, batchEvents }; +module.exports = { networkHandler, getRearrangedEvents }; From ba3a7a6202173f84a8c14e90ce692d6b44abd2a5 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Mon, 18 Sep 2023 22:14:48 +0530 Subject: [PATCH 4/9] fix: putting both error and successful events logic back --- src/v0/destinations/bqstream/util.js | 65 ++++++++++++++++--- src/v0/destinations/bqstream/util.test.js | 45 ++++++++++++- .../destinations/bqstream/router/data.ts | 45 +++++++++++-- 3 files changed, 141 insertions(+), 14 deletions(-) diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 542f4421d7..eaaf05dd58 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,10 +1,11 @@ /* eslint-disable no-param-reassign */ +const _ = require('lodash') const getValue = require('get-value'); const { getDynamicErrorType, processAxiosResponse, } = require('../../../adapters/utils/networkUtils'); -const { isHttpStatusSuccess } = require('../../util'); +const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util'); const { REFRESH_TOKEN, AUTH_STATUS_INACTIVE, @@ -175,24 +176,62 @@ const convertMetadataToArray = (eventList) => { return processedEvents; }; +/** + * Filters and splits an array of events based on whether they have an error or not. + * Returns an array of arrays, where inner arrays represent either a chunk of successful events or + * an array of single error event. It maintains the order of events strictly. + * + * @param {Array} sortedEvents - An array of events to be filtered and split. + * @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event. + */ +const restoreEventOrder = (sortedEvents) => { + let successfulEventsChunk = []; + const resultArray = []; + const errorMap = new Map(); + for (const item of sortedEvents) { + // if error is present, then push the previous successfulEventsChunk + // and then push the error event + if (isDefinedAndNotNull(item.error)) { + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + successfulEventsChunk = []; + } + optimizeErrorResponse(item, errorMap, resultArray); + } else { + // if error is not present, then push the event to successfulEventsChunk + successfulEventsChunk.push(item); + errorMap.clear(); + } + } + // Push the last successfulEventsChunk to resultArray + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + } + return resultArray; +}; + /** * Rearranges the events based on their success or error status. * If there are no successful events, it groups error events with the same error and their metadata. * If there are successful events, it returns the batched response of successful events. * - * @param {Array} eachUserSuccessEventslist - An array of objects representing successful events. + * @param {Array} successEventList - An array of objects representing successful events. * Each object should have an `id` and `metadata` property. - * @param {Array} eachUserErrorEventsList - An array of objects representing error events. + * @param {Array} errorEventList - An array of objects representing error events. * Each object should have an `id`, `metadata`, and `error` property. * @returns {Array} - An array of rearranged events. */ -const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) => { +const getRearrangedEvents = (successEventList, errorEventList) => { // Convert 'metadata' to an array if it's not already - const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist); - const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList); + const processedSuccessfulEvents = convertMetadataToArray(successEventList); + const processedErrorEvents = convertMetadataToArray(errorEventList); + // if there are no error events, then return the batched response + if (errorEventList.length === 0) { + return [processedSuccessfulEvents]; + } // if there are no batched response, then return the error events - if (eachUserSuccessEventslist.length === 0) { + if (successEventList.length === 0) { const resultArray = []; const errorMap = new Map(); processedErrorEvents.forEach((item) => { @@ -200,8 +239,16 @@ const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) }); return resultArray; } - // if there are no error events, then return the batched response - return [processedSuccessfulEvents]; + + // if there are both batched response and error events, then order them + const combinedTransformedEventList = [ + ...processedSuccessfulEvents, + ...processedErrorEvents, + ].flat(); + + const finalResp = restoreEventOrder(combinedTransformedEventList); + + return finalResp; }; module.exports = { networkHandler, getRearrangedEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js index 060b77ed7d..586eb5d1ae 100644 --- a/src/v0/destinations/bqstream/util.test.js +++ b/src/v0/destinations/bqstream/util.test.js @@ -24,7 +24,7 @@ describe('getRearrangedEvents', () => { it('should return an empty array when both input arrays are empty', () => { const eachUserSuccessEventslist = []; const eachUserErrorEventsList = []; - const expected = []; + const expected = [[]]; const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); expect(result).toEqual(expected); }); @@ -63,3 +63,46 @@ describe('getRearrangedEvents', () => { expect(result).toEqual(expected); }); }); + +// Tests that the function returns an ordered array of events with both successful and erroneous events, ordered based on the jobId property of the events' metadata array +it('should return an ordered array of events with both successful and erroneous events', () => { + const errorEventsList = [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 2 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 5 }] } + ], + [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [ + { jobId: 3, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + ]; + const result = getRearrangedEvents(successEventslist, errorEventsList); + expect(result).toEqual(expected); +}); diff --git a/test/integrations/destinations/bqstream/router/data.ts b/test/integrations/destinations/bqstream/router/data.ts index c04c12603a..4fac9047cc 100644 --- a/test/integrations/destinations/bqstream/router/data.ts +++ b/test/integrations/destinations/bqstream/router/data.ts @@ -381,14 +381,14 @@ export const data = [ ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, - error: 'Message Type not supported: identify', + error: 'Invalid payload for the destination', metadata: [ { - jobId: 3, - userId: 'user12345', + jobId: 7, + userId: 'user124', }, { - jobId: 9, + jobId: 8, userId: 'user125', }, ], @@ -402,6 +402,43 @@ export const data = [ }, statusCode: 400, }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + + error: "Message Type not supported: identify", + metadata: [ + { + jobId: 3, + userId: "user12345" + }, + { + jobId: 9, + userId: "user125" + } + ], + statTags: { + destType: "BQSTREAM", + errorCategory: "dataValidation", + errorType: "instrumentation", + feature: "router", + implementation: "native", + module: "destination" + }, + statusCode: 400 + } ], }, }, From fcaf4803db53d305a6eb8ca91bb04ebac3ba7bdd Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 19 Sep 2023 11:26:42 +0530 Subject: [PATCH 5/9] fix: test cases addition and code clean up --- src/v0/destinations/bqstream/util.js | 44 ++-- src/v0/destinations/bqstream/util.test.js | 239 ++++++++++++++++++---- 2 files changed, 213 insertions(+), 70 deletions(-) diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index eaaf05dd58..58329ef8e9 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -176,36 +176,21 @@ const convertMetadataToArray = (eventList) => { return processedEvents; }; + /** - * Filters and splits an array of events based on whether they have an error or not. - * Returns an array of arrays, where inner arrays represent either a chunk of successful events or - * an array of single error event. It maintains the order of events strictly. - * - * @param {Array} sortedEvents - An array of events to be filtered and split. - * @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event. + * Formats a list of error events into a composite response. + * + * @param {Array} errorEvents - A list of error events, where each event can have an `error` property and a `metadata` array. + * @returns {Array} The formatted composite response, where each element is an array containing the error details. */ -const restoreEventOrder = (sortedEvents) => { - let successfulEventsChunk = []; +const formatCompositeResponse = (errorEvents) => { const resultArray = []; const errorMap = new Map(); - for (const item of sortedEvents) { - // if error is present, then push the previous successfulEventsChunk - // and then push the error event + + for (const item of errorEvents) { if (isDefinedAndNotNull(item.error)) { - if (successfulEventsChunk.length > 0) { - resultArray.push(successfulEventsChunk); - successfulEventsChunk = []; - } optimizeErrorResponse(item, errorMap, resultArray); - } else { - // if error is not present, then push the event to successfulEventsChunk - successfulEventsChunk.push(item); - errorMap.clear(); - } - } - // Push the last successfulEventsChunk to resultArray - if (successfulEventsChunk.length > 0) { - resultArray.push(successfulEventsChunk); + } } return resultArray; }; @@ -242,13 +227,10 @@ const getRearrangedEvents = (successEventList, errorEventList) => { // if there are both batched response and error events, then order them const combinedTransformedEventList = [ - ...processedSuccessfulEvents, - ...processedErrorEvents, - ].flat(); - - const finalResp = restoreEventOrder(combinedTransformedEventList); - - return finalResp; + [...processedSuccessfulEvents], + ...formatCompositeResponse(processedErrorEvents) + ] + return combinedTransformedEventList; }; module.exports = { networkHandler, getRearrangedEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js index 586eb5d1ae..1e99a4f20e 100644 --- a/src/v0/destinations/bqstream/util.test.js +++ b/src/v0/destinations/bqstream/util.test.js @@ -2,7 +2,7 @@ const { getRearrangedEvents } = require('./util'); describe('getRearrangedEvents', () => { // Tests that the function returns an array of transformed events when there are no error events - it('should return an array of transformed events when there are no error events', () => { + it('should return an array of transformed events when all events are track and successfully transformed', () => { const eachUserSuccessEventslist = [ { message: { type: 'track' }, metadata: { jobId: 1 } }, { message: { type: 'track' }, metadata: { jobId: 3 } }, @@ -45,6 +45,12 @@ describe('getRearrangedEvents', () => { error: 'Message Type not supported: identify', metadata: [{ jobId: 4, userId: 'user12345' }], }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 5, userId: 'user12345' }], + }, ]; const expected = [ [ @@ -58,51 +64,206 @@ describe('getRearrangedEvents', () => { ], }, ], + [ + { + batched: false, + destination: {}, + error: "Invalid payload for the destination", + metadata: [ + { + jobId: 5, + userId: "user12345", + }, + ], + }, + ], ]; const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); expect(result).toEqual(expected); }); -}); + // Tests that the function does not return an ordered array of events with both successful and erroneous events + it('case 1 : 1--> success, 2 --> fail, 3 --> success, 4 --> fail, 5 --> success', () => { + const errorEventsList = [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 2, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 3 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 5 }] } + ], + [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [ + { jobId: 2, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + ]; + const result = getRearrangedEvents(successEventslist, errorEventsList); + expect(result).toEqual(expected); + }); -// Tests that the function returns an ordered array of events with both successful and erroneous events, ordered based on the jobId property of the events' metadata array -it('should return an ordered array of events with both successful and erroneous events', () => { - const errorEventsList = [ - { - batched: false, - destination: {}, - error: 'Message Type not supported: identify', - metadata: [{ jobId: 3, userId: 'user12345' }], - }, - { - batched: false, - destination: {}, - error: 'Message Type not supported: identify', - metadata: [{ jobId: 4, userId: 'user12345' }], - }, - ]; - const successEventslist = [ - { message: { type: 'track' }, metadata: { jobId: 1 } }, - { message: { type: 'track' }, metadata: { jobId: 2 } }, - { message: { type: 'track' }, metadata: { jobId: 5 } }, - ]; - const expected = [ - [ - { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, - { message: { type: 'track' }, metadata: [{ jobId: 2 }] }, - { message: { type: 'track' }, metadata: [{ jobId: 5 }] } - ], - [ + it('case 2 : 1--> success, 2 --> success, 3 --> fail, 4 --> fail, 5 --> success', () => { + const errorEventsList = [ { batched: false, destination: {}, - error: 'Message Type not supported: identify', - metadata: [ - { jobId: 3, userId: 'user12345' }, - { jobId: 4, userId: 'user12345' }, - ], + error: 'Invalid payload for the destination', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 1 + } + ] + }, + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 2 + } + ] + }, + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 5 + } + ] + } + ], + [ + { + "batched": false, + "destination": {}, + "error": "Invalid payload for the destination", + "metadata": [ + { + "jobId": 3, + "userId": "user12345" + }, + { + "jobId": 4, + "userId": "user12345" + } + ] + } + ] + ] + const result = getRearrangedEvents(successEventslist, errorEventsList); + console.log(JSON.stringify(result)); + expect(result).toEqual(expected); + }); + + it('case 3 : 1--> fail, 2 --> success, 3 --> success, 4 --> fail', () => { + const errorEventsList = [ + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 1, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Invalid payload for the destination', + metadata: [{ jobId: 4, userId: 'user12345' }], }, - ], - ]; - const result = getRearrangedEvents(successEventslist, errorEventsList); - expect(result).toEqual(expected); + ]; + const successEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + ]; + const expected = [ + [ + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 2 + } + ] + }, + { + "message": { + "type": "track" + }, + "metadata": [ + { + "jobId": 3 + } + ] + } + ], + [ + { + "batched": false, + "destination": {}, + "error": "Invalid payload for the destination", + "metadata": [ + { + "jobId": 1, + "userId": "user12345" + }, + { + "jobId": 4, + "userId": "user12345" + } + ] + } + ] + ] + const result = getRearrangedEvents(successEventslist, errorEventsList); + console.log(JSON.stringify(result)); + expect(result).toEqual(expected); + }); + }); + From 1b41190ce718fb18063243bb57b93f2b78efe258 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 19 Sep 2023 11:31:06 +0530 Subject: [PATCH 6/9] fix: code clean up --- src/v0/destinations/bqstream/util.js | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 58329ef8e9..f8a30b22d9 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -217,12 +217,7 @@ const getRearrangedEvents = (successEventList, errorEventList) => { } // if there are no batched response, then return the error events if (successEventList.length === 0) { - const resultArray = []; - const errorMap = new Map(); - processedErrorEvents.forEach((item) => { - optimizeErrorResponse(item, errorMap, resultArray); - }); - return resultArray; + return formatCompositeResponse(processedErrorEvents) } // if there are both batched response and error events, then order them From e6993e51ffa6dcddffc76439933e273eaabb8e57 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 19 Sep 2023 11:34:23 +0530 Subject: [PATCH 7/9] fix: resolving sonar error --- src/v0/destinations/bqstream/util.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index f8a30b22d9..df56237662 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,5 +1,4 @@ /* eslint-disable no-param-reassign */ -const _ = require('lodash') const getValue = require('get-value'); const { getDynamicErrorType, From 01b3fd543988aabb55b4740a356b8ab605ad6d7e Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 19 Sep 2023 21:02:32 +0530 Subject: [PATCH 8/9] fix: removing network handler for bqstream --- src/v0/destinations/bqstream/transform.js | 20 ++- src/v0/destinations/bqstream/util.js | 161 +--------------------- 2 files changed, 20 insertions(+), 161 deletions(-) diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index c98a882a20..83b883e30a 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -100,18 +100,18 @@ const batchEachUserSuccessEvents = (eventsChunk) => { }; const processRouterDest = (inputs) => { - let orderedEventsList; const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION); if (errorRespEvents.length > 0) { return errorRespEvents; } const finalResp = []; - let eachTypeBatchedResponse = []; + const batchedEvents = groupEventsByType(inputs); batchedEvents.forEach((listOfEvents) => { const eachTypeSuccessEventList = []; // temporary variable to divide payload into chunks const eachTypeErrorEventsList = []; + listOfEvents.forEach((event) => { try { if (event.message.statusCode) { @@ -119,9 +119,10 @@ const processRouterDest = (inputs) => { eachTypeSuccessEventList.push(event); } else { // if not transformed - let response = process(event); - response = Array.isArray(response) ? response : [response]; - response.forEach((res) => { + const response = process(event); + const transformedEvents = Array.isArray(response) ? response : [response]; + + transformedEvents.forEach((res) => { eachTypeSuccessEventList.push({ message: res, metadata: event.metadata, @@ -134,14 +135,19 @@ const processRouterDest = (inputs) => { eachTypeErrorEventsList.push(eachUserErrorEvent); } }); - orderedEventsList = getRearrangedEvents(eachTypeSuccessEventList, eachTypeErrorEventsList); + + const orderedEventsList = getRearrangedEvents( + eachTypeSuccessEventList, + eachTypeErrorEventsList, + ); + orderedEventsList.forEach((eventList) => { // no error event list will have more than one items in the list if (eventList[0].error) { finalResp.push([...eventList]); } else { // batch the successful events - eachTypeBatchedResponse = batchEachUserSuccessEvents(eventList); + const eachTypeBatchedResponse = batchEachUserSuccessEvents(eventList); finalResp.push([...eachTypeBatchedResponse]); } }); diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index df56237662..06b7403c87 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,151 +1,5 @@ /* eslint-disable no-param-reassign */ -const getValue = require('get-value'); -const { - getDynamicErrorType, - processAxiosResponse, -} = require('../../../adapters/utils/networkUtils'); -const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util'); -const { - REFRESH_TOKEN, - AUTH_STATUS_INACTIVE, -} = require('../../../adapters/networkhandler/authConstants'); -const { proxyRequest } = require('../../../adapters/network'); -const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes'); -const tags = require('../../util/tags'); - -const DESTINATION_NAME = 'bqstream'; - -const trimBqStreamResponse = (response) => ({ - code: getValue(response, 'response.response.data.error.code'), // data.error.status which contains PERMISSION_DENIED - status: getValue(response, 'response.response.status'), - statusText: getValue(response, 'response.response.statusText'), - headers: getValue(response, 'response.response.headers'), - data: getValue(response, 'response.response.data'), // Incase of errors, this contains error data - success: getValue(response, 'suceess'), -}); -/** - * Obtains the Destination OAuth Error Category based on the error code obtained from destination - * - * - If an error code is such that the user will not be allowed inside the destination, - * such error codes fall under AUTH_STATUS_INACTIVE - * - If an error code is such that upon refresh we can get a new token which can be used to send event, - * such error codes fall under REFRESH_TOKEN category - * - If an error code doesn't fall under both categories, we can return an empty string - * @param {string} errorCategory - The error code obtained from the destination - * @returns Destination OAuth Error Category - */ -const getDestAuthCategory = (errorCategory) => { - switch (errorCategory) { - case 'PERMISSION_DENIED': - return AUTH_STATUS_INACTIVE; - case 'UNAUTHENTICATED': - return REFRESH_TOKEN; - default: - return ''; - } -}; - -const destToRudderStatusMap = { - 403: { - rateLimitExceeded: 429, - default: 400, - }, - 400: { - tableUnavailable: 500, - default: 400, - }, - 500: { default: 500 }, - 503: { default: 500 }, - 401: { default: 500 }, - 404: { default: 400 }, - 501: { default: 400 }, -}; - -const getStatusAndCategory = (dresponse, status) => { - const authErrorCategory = getDestAuthCategory(dresponse.error.status); - const reason = - dresponse.error.errors && - Array.isArray(dresponse.error.errors) && - dresponse.error.errors.length > 0 && - dresponse.error.errors[0].reason; - - const trStatus = destToRudderStatusMap[status] - ? destToRudderStatusMap[status][reason] || destToRudderStatusMap[status].default - : 500; - return { status: trStatus, authErrorCategory }; -}; - -/** - * This class actually handles the response for BigQuery Stream API - * It can also be used for any Google related API but an API related handling has to be done separately - * - * Here we are only trying to handle OAuth related error(s) - * Any destination specific error handling has to be done in their own way - * - * Reference doc for OAuth Errors - * 1. https://cloud.google.com/apigee/docs/api-platform/reference/policies/oauth-http-status-code-reference - * 2. https://cloud.google.com/bigquery/docs/error-messages - * - * Summary: - * Abortable -> 403, 501, 400 - * Retryable -> 5[0-9][02-9], 401(UNAUTHENTICATED) - * "Special Cases": - * status=200, resp.insertErrors.length > 0 === Failure - * 403 => AccessDenied -> AUTH_STATUS_INACTIVE, other 403 => Just abort - * - */ -const processResponse = ({ dresponse, status } = {}) => { - const isSuccess = - !dresponse.error && - isHttpStatusSuccess(status) && - (!dresponse.insertErrors || (dresponse.insertErrors && dresponse.insertErrors.length === 0)); - - if (!isSuccess) { - if (dresponse.error) { - const { status: trStatus } = getStatusAndCategory(dresponse, status); - throw new NetworkError( - dresponse.error.message || `Request failed for ${DESTINATION_NAME} with status: ${status}`, - trStatus, - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(trStatus), - }, - dresponse, - ); - } else if (dresponse.insertErrors && dresponse.insertErrors.length > 0) { - const temp = trimBqStreamResponse(dresponse); - throw new AbortedError( - 'Problem during insert operation', - 400, - { - [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(temp.status || 400), - }, - temp, - getDestAuthCategory(temp.code), - ); - } - throw new UnhandledStatusCodeError('Unhandled error type while sending to destination'); - } -}; - -const responseHandler = (respTransformPayload) => { - const { response, status } = respTransformPayload; - processResponse({ - dresponse: response, - status, - }); - return { - status, - destinationResponse: response, - message: 'Request Processed Successfully', - }; -}; - -function networkHandler() { - this.responseHandler = responseHandler; - this.proxy = proxyRequest; - this.processAxiosResponse = processAxiosResponse; -} - +const { isDefinedAndNotNull } = require('../../util'); /** * Optimizes the error response by merging the metadata of the same error type and adding it to the result array. * @@ -175,10 +29,9 @@ const convertMetadataToArray = (eventList) => { return processedEvents; }; - /** * Formats a list of error events into a composite response. - * + * * @param {Array} errorEvents - A list of error events, where each event can have an `error` property and a `metadata` array. * @returns {Array} The formatted composite response, where each element is an array containing the error details. */ @@ -189,7 +42,7 @@ const formatCompositeResponse = (errorEvents) => { for (const item of errorEvents) { if (isDefinedAndNotNull(item.error)) { optimizeErrorResponse(item, errorMap, resultArray); - } + } } return resultArray; }; @@ -216,15 +69,15 @@ const getRearrangedEvents = (successEventList, errorEventList) => { } // if there are no batched response, then return the error events if (successEventList.length === 0) { - return formatCompositeResponse(processedErrorEvents) + return formatCompositeResponse(processedErrorEvents); } // if there are both batched response and error events, then order them const combinedTransformedEventList = [ [...processedSuccessfulEvents], - ...formatCompositeResponse(processedErrorEvents) - ] + ...formatCompositeResponse(processedErrorEvents), + ]; return combinedTransformedEventList; }; -module.exports = { networkHandler, getRearrangedEvents }; +module.exports = { getRearrangedEvents }; From 46cfd5402d606b979a291ac16d05e0a3250a4594 Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Tue, 19 Sep 2023 21:22:18 +0530 Subject: [PATCH 9/9] fix: refactor process router dest --- src/v0/destinations/bqstream/transform.js | 61 +++++++++++++---------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 83b883e30a..0674f5e679 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -99,6 +99,36 @@ const batchEachUserSuccessEvents = (eventsChunk) => { return batchedResponseList; }; +const processEachTypedEventList = ( + typedEventList, + eachTypeSuccessEventList, + eachTypeErrorEventsList, +) => { + typedEventList.forEach((event) => { + try { + if (event.message.statusCode) { + // already transformed event + eachTypeSuccessEventList.push(event); + } else { + // if not transformed + const response = process(event); + const transformedEvents = Array.isArray(response) ? response : [response]; + + transformedEvents.forEach((res) => { + eachTypeSuccessEventList.push({ + message: res, + metadata: event.metadata, + destination: event.destination, + }); + }); + } + } catch (error) { + const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); + eachTypeErrorEventsList.push(eachUserErrorEvent); + } + }); +}; + const processRouterDest = (inputs) => { const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION); if (errorRespEvents.length > 0) { @@ -108,33 +138,10 @@ const processRouterDest = (inputs) => { const batchedEvents = groupEventsByType(inputs); - batchedEvents.forEach((listOfEvents) => { - const eachTypeSuccessEventList = []; // temporary variable to divide payload into chunks - const eachTypeErrorEventsList = []; - - listOfEvents.forEach((event) => { - try { - if (event.message.statusCode) { - // already transformed event - eachTypeSuccessEventList.push(event); - } else { - // if not transformed - const response = process(event); - const transformedEvents = Array.isArray(response) ? response : [response]; - - transformedEvents.forEach((res) => { - eachTypeSuccessEventList.push({ - message: res, - metadata: event.metadata, - destination: event.destination, - }); - }); - } - } catch (error) { - const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); - eachTypeErrorEventsList.push(eachUserErrorEvent); - } - }); + batchedEvents.forEach((typedEventList) => { + const eachTypeSuccessEventList = []; // list of events that are transformed successfully + const eachTypeErrorEventsList = []; // list of events that are errored out + processEachTypedEventList(typedEventList, eachTypeSuccessEventList, eachTypeErrorEventsList); const orderedEventsList = getRearrangedEvents( eachTypeSuccessEventList,