diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index 4db1856535..49fd35503f 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -9,6 +9,7 @@ const { } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); +const { getRearrangedEvents } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -50,7 +51,7 @@ const process = (event) => { }; }; -const batchEvents = (eventsChunk) => { +const batchEachUserSuccessEvents = (eventsChunk) => { const batchedResponseList = []; // arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...] @@ -68,7 +69,7 @@ const batchEvents = (eventsChunk) => { chunk.forEach((ev) => { // Pixel code must be added above "batch": [..] batchResponseList.push(ev.message.properties); - metadata.push(ev.metadata); + metadata.push(ev.metadata[0]); }); batchEventResponse.batchedRequest = { @@ -102,21 +103,20 @@ const processRouterDest = (inputs) => { if (errorRespEvents.length > 0) { return errorRespEvents; } - - const eventsChunk = []; // temporary variable to divide payload into chunks - const errorRespList = []; - + const finalResp = []; + const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks + const eachUserErrorEventsList = []; inputs.forEach((event) => { try { if (event.message.statusCode) { // already transformed event - eventsChunk.push(event); + eachUserSuccessEventslist.push(event); } else { // if not transformed let response = process(event); response = Array.isArray(response) ? response : [response]; response.forEach((res) => { - eventsChunk.push({ + eachUserSuccessEventslist.push({ message: res, metadata: event.metadata, destination: event.destination, @@ -124,16 +124,26 @@ const processRouterDest = (inputs) => { }); } } catch (error) { - const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION); - errorRespList.push(errRespEvent); + const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); + eachUserErrorEventsList.push(eachUserErrorEvent); } }); - let batchedResponseList = []; - if (eventsChunk.length > 0) { - batchedResponseList = batchEvents(eventsChunk); - } - return [...batchedResponseList, ...errorRespList]; + const orderedEventsList = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + // divide the successful payloads into batches + let eachUserBatchedResponse = []; + orderedEventsList.forEach((eventList) => { + // no error event list will have more than one items in the list + if (eventList[0].error) { + finalResp.push([...eventList]); + } else { + // batch the successful events + eachUserBatchedResponse = batchEachUserSuccessEvents(eventList); + finalResp.push([...eachUserBatchedResponse]); + } + }); + + return finalResp.flat(); }; module.exports = { process, processRouterDest }; diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index 2448d72d76..6670623bbc 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -1,14 +1,15 @@ /* eslint-disable no-param-reassign */ +const _ = require('lodash'); const getValue = require('get-value'); const { getDynamicErrorType, processAxiosResponse, } = require('../../../adapters/utils/networkUtils'); +const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util'); const { REFRESH_TOKEN, AUTH_STATUS_INACTIVE, } = require('../../../adapters/networkhandler/authConstants'); -const { isHttpStatusSuccess } = require('../../util'); const { proxyRequest } = require('../../../adapters/network'); const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes'); const tags = require('../../util/tags'); @@ -146,4 +147,114 @@ function networkHandler() { this.processAxiosResponse = processAxiosResponse; } -module.exports = { networkHandler }; +/** + * Optimizes the error response by merging the metadata of the same error type and adding it to the result array. + * + * @param {Object} item - An object representing an error event with properties like `error`, `jobId`, and `metadata`. + * @param {Map} errorMap - A Map object to store the error events and their metadata. + * @param {Array} resultArray - An array to store the optimized error response. + * @returns {void} + */ +const optimizeErrorResponse = (item, errorMap, resultArray) => { + const currentError = item.error; + if (errorMap.has(currentError)) { + // If the error already exists in the map, merge the metadata + const existingErrDetails = errorMap.get(currentError); + existingErrDetails.metadata.push(...item.metadata); + } else { + // Otherwise, add it to the map + errorMap.set(currentError, { ...item }); + resultArray.push([errorMap.get(currentError)]); + } +}; + +/** + * Filters and splits an array of events based on whether they have an error or not. + * Returns an array of arrays, where inner arrays represent either a chunk of successful events or + * an array of single error event. It maintains the order of events strictly. + * + * @param {Array} sortedEvents - An array of events to be filtered and split. + * @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event. + */ +const restoreEventOrder = (sortedEvents) => { + let successfulEventsChunk = []; + const resultArray = []; + const errorMap = new Map(); + for (const item of sortedEvents) { + // if error is present, then push the previous successfulEventsChunk + // and then push the error event + if (isDefinedAndNotNull(item.error)) { + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + successfulEventsChunk = []; + } + optimizeErrorResponse(item, errorMap, resultArray); + } else { + // if error is not present, then push the event to successfulEventsChunk + successfulEventsChunk.push(item); + errorMap.clear(); + } + } + // Push the last successfulEventsChunk to resultArray + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + } + return resultArray; +}; + +const convertMetadataToArray = (eventList) => { + const processedEvents = eventList.map((event) => ({ + ...event, + metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata], + })); + return processedEvents; +}; + +/** + * Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events. + * If there are no error events, it returns the array of transformed events. + * If there are no successful responses, it returns the error events. + * If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array. + * considering error responses are built with @handleRtTfSingleEventError + * + * @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user. + * @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user. + * @returns {Array} - An ordered array of events. + * + * @example + * const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}]; + * const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}]; + * Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]] + */ +const getRearrangedEvents = (eachUserSuccessEventslist, eachUserErrorEventsList) => { + // Convert 'metadata' to an array if it's not already + const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist); + const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList); + + // if there are no error events, then return the batched response + if (eachUserErrorEventsList.length === 0) { + return [processedSuccessfulEvents]; + } + // if there are no batched response, then return the error events + if (eachUserSuccessEventslist.length === 0) { + const resultArray = []; + const errorMap = new Map(); + processedErrorEvents.forEach((item) => { + optimizeErrorResponse(item, errorMap, resultArray); + }); + return resultArray; + } + + // if there are both batched response and error events, then order them + const combinedTransformedEventList = [ + ...processedSuccessfulEvents, + ...processedErrorEvents, + ].flat(); + + const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId); + const finalResp = restoreEventOrder(sortedEvents); + + return finalResp; +}; + +module.exports = { networkHandler, getRearrangedEvents }; diff --git a/src/v0/destinations/bqstream/util.test.js b/src/v0/destinations/bqstream/util.test.js new file mode 100644 index 0000000000..3f9b6d9ff3 --- /dev/null +++ b/src/v0/destinations/bqstream/util.test.js @@ -0,0 +1,108 @@ +const { getRearrangedEvents } = require('./util'); + +describe('getRearrangedEvents', () => { + // Tests that the function returns an array of transformed events when there are no error events + it('should return an array of transformed events when there are no error events', () => { + const eachUserSuccessEventslist = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 3 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const eachUserErrorEventsList = []; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 3 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 5 }] }, + ], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an empty array when both input arrays are empty + it('should return an empty array when both input arrays are empty', () => { + const eachUserSuccessEventslist = []; + const eachUserErrorEventsList = []; + const expected = [[]]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an array with only error events when all events are erroneous + it('should return an array with only error events when all events are erroneous', () => { + const eachUserSuccessEventslist = []; + const eachUserErrorEventsList = [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const expected = [ + [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [ + { jobId: 3, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); + + // Tests that the function returns an ordered array of events with both successful and erroneous events, ordered based on the jobId property of the events' metadata array + it('should return an ordered array of events with both successful and erroneous events', () => { + const eachUserSuccessEventslist = [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 3, userId: 'user12345' }], + }, + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [{ jobId: 4, userId: 'user12345' }], + }, + ]; + const eachUserErrorEventsList = [ + { message: { type: 'track' }, metadata: { jobId: 1 } }, + { message: { type: 'track' }, metadata: { jobId: 2 } }, + { message: { type: 'track' }, metadata: { jobId: 5 } }, + ]; + const expected = [ + [ + { message: { type: 'track' }, metadata: [{ jobId: 1 }] }, + { message: { type: 'track' }, metadata: [{ jobId: 2 }] }, + ], + [ + { + batched: false, + destination: {}, + error: 'Message Type not supported: identify', + metadata: [ + { jobId: 3, userId: 'user12345' }, + { jobId: 4, userId: 'user12345' }, + ], + }, + ], + [{ message: { type: 'track' }, metadata: [{ jobId: 5 }] }], + ]; + const result = getRearrangedEvents(eachUserSuccessEventslist, eachUserErrorEventsList); + expect(result).toEqual(expected); + }); +}); diff --git a/test/integrations/destinations/bqstream/router/data.ts b/test/integrations/destinations/bqstream/router/data.ts index 0f6e663ad0..d27fb928b7 100644 --- a/test/integrations/destinations/bqstream/router/data.ts +++ b/test/integrations/destinations/bqstream/router/data.ts @@ -13,66 +13,141 @@ export const data = [ message: { type: 'track', event: 'insert product', - sentAt: '2021-09-08T11:10:45.466Z', userId: 'user12345', - channel: 'web', - context: { - os: { - name: '', - version: '', - }, - app: { - name: 'RudderLabs JavaScript SDK', - build: '1.0.0', - version: '1.1.18', - namespace: 'com.rudderlabs.javascript', - }, - page: { - url: 'http://127.0.0.1:5500/index.html', - path: '/index.html', - title: 'Document', - search: '', - tab_url: 'http://127.0.0.1:5500/index.html', - referrer: '$direct', - initial_referrer: '$direct', - referring_domain: '', - initial_referring_domain: '', - }, - locale: 'en-GB', - screen: { - width: 1536, - height: 960, - density: 2, - innerWidth: 1536, - innerHeight: 776, - }, - traits: {}, - library: { - name: 'RudderLabs JavaScript SDK', - version: '1.1.18', - }, - campaign: {}, - userAgent: - 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', - }, - rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380', - messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95', - timestamp: '2021-11-15T14:06:42.497+05:30', + properties: { count: 10, productId: 10, productName: 'Product-10', }, - receivedAt: '2021-11-15T14:06:42.497+05:30', - request_ip: '[::1]', anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', - integrations: { - All: true, - }, - originalTimestamp: '2021-09-08T11:10:45.466Z', }, metadata: { jobId: 1, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + }, + metadata: { + jobId: 2, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'identify', + event: 'insert product', + userId: 'user12345', + traits: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 3, + userId: 'user12345', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 5, + userId: 'user123', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + properties: { + count: 20, + productId: 20, + productName: 'Product-20', + }, + anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', + }, + metadata: { + jobId: 6, + userId: 'user124', }, destination: { Config: { @@ -84,7 +159,7 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, @@ -93,19 +168,70 @@ export const data = [ message: { type: 'track', event: 'insert product', - sentAt: '2021-09-08T11:10:45.466Z', + userId: 'user12345', - channel: 'web', + }, + metadata: { + jobId: 7, + userId: 'user124', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'track', + event: 'insert product', + userId: 'user12345', + }, + metadata: { + jobId: 8, + userId: 'user125', + }, + destination: { + Config: { + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + projectId: 'gc-project-id', + datasetId: 'gc_dataset', + tableId: 'gc_table', + insertId: 'productId', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + message: { + type: 'identify', + event: 'insert product', + + userId: 'user12345', + context: { os: { - name: '', + Name: '', version: '', }, app: { - name: 'RudderLabs JavaScript SDK', + Name: 'RudderLabs JavaScript SDK', build: '1.0.0', version: '1.1.18', - namespace: 'com.rudderlabs.javascript', + Namespace: 'com.rudderlabs.javascript', }, page: { url: 'http://127.0.0.1:5500/index.html', @@ -128,31 +254,25 @@ export const data = [ }, traits: {}, library: { - name: 'RudderLabs JavaScript SDK', + Name: 'RudderLabs JavaScript SDK', version: '1.1.18', }, campaign: {}, userAgent: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36', }, - rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380', - messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95', - timestamp: '2021-11-15T14:06:42.497+05:30', - properties: { + + traits: { count: 20, productId: 20, productName: 'Product-20', }, receivedAt: '2021-11-15T14:06:42.497+05:30', - request_ip: '[::1]', anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf', - integrations: { - All: true, - }, - originalTimestamp: '2021-09-08T11:10:45.466Z', }, metadata: { - jobId: 2, + jobId: 9, + userId: 'user125', }, destination: { Config: { @@ -164,7 +284,7 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, @@ -203,9 +323,11 @@ export const data = [ metadata: [ { jobId: 1, + userId: 'user12345', }, { jobId: 2, + userId: 'user12345', }, ], batched: true, @@ -220,10 +342,160 @@ export const data = [ eventDelivery: true, eventDeliveryTS: 1636965406397, }, - Enabled: true, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + error: 'Message Type not supported: identify', + metadata: [ + { + jobId: 3, + userId: 'user12345', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, + }, + { + batched: true, + batchedRequest: { + datasetId: 'gc_dataset', + projectId: 'gc-project-id', + properties: [ + { + count: 20, + insertId: '20', + productId: 20, + productName: 'Product-20', + }, + { + count: 20, + insertId: '20', + productId: 20, + productName: 'Product-20', + }, + ], + tableId: 'gc_table', + }, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + metadata: [ + { + jobId: 5, + userId: 'user123', + }, + { + jobId: 6, + userId: 'user124', + }, + ], + statusCode: 200, + }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', + Name: 'bqstream test', + }, + error: 'Invalid payload for the destination', + metadata: [ + { + jobId: 7, + userId: 'user124', + }, + { + jobId: 8, + userId: 'user125', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, + }, + { + batched: false, + destination: { + Config: { + datasetId: 'gc_dataset', + eventDelivery: true, + eventDeliveryTS: 1636965406397, + insertId: 'productId', + projectId: 'gc-project-id', + rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-', + tableId: 'gc_table', + }, + ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR', Name: 'bqstream test', }, + error: 'Message Type not supported: identify', + metadata: [ + { + jobId: 9, + userId: 'user125', + }, + ], + statTags: { + destType: 'BQSTREAM', + errorCategory: 'dataValidation', + errorType: 'instrumentation', + feature: 'router', + implementation: 'native', + module: 'destination', + }, + statusCode: 400, }, ], },