From e90f54b028c1b58ff187eedc391a6c1871a54c8e Mon Sep 17 00:00:00 2001 From: shrouti1507 Date: Mon, 11 Sep 2023 15:38:37 +0530 Subject: [PATCH] fix: addressing comments --- src/v0/destinations/bqstream/transform.js | 54 +++++++------- src/v0/destinations/bqstream/util.js | 86 ++++++++++++++++++++++- 2 files changed, 109 insertions(+), 31 deletions(-) diff --git a/src/v0/destinations/bqstream/transform.js b/src/v0/destinations/bqstream/transform.js index a3e6c252bd..631689cc75 100644 --- a/src/v0/destinations/bqstream/transform.js +++ b/src/v0/destinations/bqstream/transform.js @@ -9,7 +9,7 @@ const { } = require('../../util'); const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config'); const { InstrumentationError } = require('../../util/errorTypes'); -const { getGroupedEvents } = require('./util'); +const { generateUserJourneys, HandleEventOrdering } = require('./util'); const getInsertIdColValue = (properties, insertIdCol) => { if ( @@ -51,7 +51,7 @@ const process = (event) => { }; }; -const batchEvents = (eventsChunk) => { +const batchEachUserSuccessEvents = (eventsChunk) => { const batchedResponseList = []; // arrayChunks = [[e1,e2, ..batchSize], [e1,e2, ..batchSize], ...] @@ -69,7 +69,7 @@ const batchEvents = (eventsChunk) => { chunk.forEach((ev) => { // Pixel code must be added above "batch": [..] batchResponseList.push(ev.message.properties); - metadata.push(ev.metadata); + metadata.push(ev.metadata[0]); }); batchEventResponse.batchedRequest = { @@ -103,23 +103,23 @@ const processRouterDest = (inputs) => { if (errorRespEvents.length > 0) { return errorRespEvents; } - const groupedEvents = getGroupedEvents(inputs); + const userJourneyArrays = generateUserJourneys(inputs); const finalResp = []; - groupedEvents.forEach((eventList) => { - let eventsChunk = []; // temporary variable to divide payload into chunks - let errorRespList = []; - if (eventList.length > 0) { - eventList.forEach((event) => { + userJourneyArrays.forEach((eachUserJourney) => { + const eachUserSuccessEventslist = []; // temporary variable to divide payload into chunks + const eachUserErrorEventsList = []; + if (eachUserJourney.length > 0) { + eachUserJourney.forEach((event) => { try { if (event.message.statusCode) { // already transformed event - eventsChunk.push(event); + eachUserSuccessEventslist.push(event); } else { // if not transformed let response = process(event); response = Array.isArray(response) ? response : [response]; response.forEach((res) => { - eventsChunk.push({ + eachUserSuccessEventslist.push({ message: res, metadata: event.metadata, destination: event.destination, @@ -127,25 +127,23 @@ const processRouterDest = (inputs) => { }); } } catch (error) { - const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION); - // divide the successful payloads till now into batches - let batchedResponseList = []; - if (eventsChunk.length > 0) { - batchedResponseList = batchEvents(eventsChunk); - } - // clear up the temporary variable - eventsChunk = []; - errorRespList.push(errRespEvent); - finalResp.push([...batchedResponseList, ...errorRespList]); - // putting it back as an empty array - errorRespList = []; + const eachUserErrorEvent = handleRtTfSingleEventError(event, error, DESTINATION); + eachUserErrorEventsList.push(eachUserErrorEvent); + } + }); + const orderedEventsList = HandleEventOrdering(eachUserSuccessEventslist, eachUserErrorEventsList) + // divide the successful payloads into batches + let eachUserBatchedResponse = []; + orderedEventsList.forEach((eventList) => { + // no error event list will have more than one items in the list + if(eventList[0].error) { + finalResp.push([...eventList]); + } else { + // batch the successful events + eachUserBatchedResponse = batchEachUserSuccessEvents(eventList); + finalResp.push([...eachUserBatchedResponse]); } }); - let batchedResponseList = []; - if (eventsChunk.length > 0) { - batchedResponseList = batchEvents(eventsChunk); - finalResp.push([...batchedResponseList]); - } } }); return finalResp.flat(); diff --git a/src/v0/destinations/bqstream/util.js b/src/v0/destinations/bqstream/util.js index d12a874f97..df00b55779 100644 --- a/src/v0/destinations/bqstream/util.js +++ b/src/v0/destinations/bqstream/util.js @@ -6,7 +6,7 @@ const { processAxiosResponse, } = require('../../../adapters/utils/networkUtils'); const { DISABLE_DEST, REFRESH_TOKEN } = require('../../../adapters/networkhandler/authConstants'); -const { isHttpStatusSuccess } = require('../../util'); +const { isHttpStatusSuccess, isDefinedAndNotNull } = require('../../util'); const { proxyRequest } = require('../../../adapters/network'); const { UnhandledStatusCodeError, NetworkError, AbortedError } = require('../../util/errorTypes'); const tags = require('../../util/tags'); @@ -151,10 +151,90 @@ function networkHandler() { * @returns {Array} An array of arrays containing the grouped events. * Each inner array represents a user journey. */ -const getGroupedEvents = (inputs) => { +const generateUserJourneys = (inputs) => { const userIdEventMap = _.groupBy(inputs, 'metadata.userId'); const eventGroupedByUserId = Object.values(userIdEventMap); return eventGroupedByUserId; }; -module.exports = { networkHandler, getGroupedEvents }; +/** + * Filters and splits an array of events based on whether they have an error or not. + * Returns an array of arrays, where inner arrays represent either a chunk of successful events or + * an array of single error event. It maintains the order of events strictly. + * + * @param {Array} sortedEvents - An array of events to be filtered and split. + * @returns {Array} - An array of arrays where each inner array represents a chunk of successful events followed by an error event. + */ +const filterAndSplitEvents = (sortedEvents) => { + let successfulEventsChunk = []; + const resultArray = [] + for (const item of sortedEvents) { + // if error is present, then push the previous successfulEventsChunk + // and then push the error event + if (isDefinedAndNotNull(item.error)) { + if(successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + successfulEventsChunk = []; + } + resultArray.push([item]); + } else { + // if error is not present, then push the event to successfulEventsChunk + successfulEventsChunk.push(item); + } + } + // Push the last successfulEventsChunk to resultArray + if (successfulEventsChunk.length > 0) { + resultArray.push(successfulEventsChunk); + } + return resultArray; +}; + + +const convertMetadataToArray = (eventList ) => { + const processedEvents = eventList.map((event) => ({ + ...event, + metadata: Array.isArray(event.metadata) ? event.metadata : [event.metadata], + })); + return processedEvents; +} + + /** + * Takes in two arrays, eachUserSuccessEventslist and eachUserErrorEventsList, and returns an ordered array of events. + * If there are no error events, it returns the array of transformed events. + * If there are no successful responses, it returns the error events. + * If there are both successful and erroneous events, it orders them based on the jobId property of the events' metadata array. + * considering error responses are built with @handleRtTfSingleEventError + * + * @param {Array} eachUserSuccessEventslist - An array of events representing successful responses for a user. + * @param {Array} eachUserErrorEventsList - An array of events representing error responses for a user. + * @returns {Array} - An ordered array of events. + * + * @example + * const eachUserSuccessEventslist = [{track, jobId: 1}, {track, jobId: 2}, {track, jobId: 5}]; + * const eachUserErrorEventsList = [{identify, jobId: 3}, {identify, jobId: 4}]; + * Output: [[{track, jobId: 1}, {track, jobId: 2}],[{identify, jobId: 3}],[{identify, jobId: 4}], {track, jobId: 5}]] + */ + const HandleEventOrdering = (eachUserSuccessEventslist, eachUserErrorEventsList) => { + // Convert 'metadata' to an array if it's not already + const processedSuccessfulEvents = convertMetadataToArray(eachUserSuccessEventslist); + const processedErrorEvents = convertMetadataToArray(eachUserErrorEventsList); + + // if there are no error events, then return the batched response + if (eachUserErrorEventsList.length === 0) { + return [processedSuccessfulEvents]; + } + // if there are no batched response, then return the error events + if (eachUserSuccessEventslist.length === 0) { + return [processedErrorEvents]; + } + + // if there are both batched response and error events, then order them + const combinedTransformedEventList = [...processedSuccessfulEvents, ...processedErrorEvents].flat(); + + const sortedEvents = _.sortBy(combinedTransformedEventList, (event) => event.metadata[0].jobId); + const finalResp = filterAndSplitEvents(sortedEvents); + + return finalResp; + } + +module.exports = { networkHandler, generateUserJourneys, HandleEventOrdering };