From 40a97776c3d6410ab5de056d7c16ec35d22d7db0 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Mon, 24 Jun 2024 11:00:24 +0530 Subject: [PATCH 01/18] feat: add job run id level isolation --- src/services/destination/cdkV2Integration.ts | 8 ++++---- src/services/destination/nativeIntegration.ts | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/services/destination/cdkV2Integration.ts b/src/services/destination/cdkV2Integration.ts index a91bc5674b..4873ace667 100644 --- a/src/services/destination/cdkV2Integration.ts +++ b/src/services/destination/cdkV2Integration.ts @@ -112,10 +112,10 @@ export class CDKV2DestinationService implements DestinationService { _version: string, requestMetadata: NonNullable, ): Promise { - const allDestEvents: object = groupBy( - events, - (ev: RouterTransformationRequestData) => ev.destination?.ID, - ); + const allDestEvents: object = groupBy(events, (ev: RouterTransformationRequestData) => [ + ev.destination?.ID, + ev.metadata?.sourceJobRunId, + ]); const response: RouterTransformationResponse[][] = await Promise.all( Object.values(allDestEvents).map( async (destInputArray: RouterTransformationRequestData[]) => { diff --git a/src/services/destination/nativeIntegration.ts b/src/services/destination/nativeIntegration.ts index 38a27ea71d..bb308d0b41 100644 --- a/src/services/destination/nativeIntegration.ts +++ b/src/services/destination/nativeIntegration.ts @@ -101,7 +101,7 @@ export class NativeIntegrationDestinationService implements DestinationService { const destHandler = FetchHandler.getDestHandler(destinationType, version); const allDestEvents: NonNullable = groupBy( events, - (ev: RouterTransformationRequestData) => ev.destination?.ID, + (ev: RouterTransformationRequestData) => [ev.destination?.ID, ev.metadata?.sourceJobRunId], ); const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents); const response: RouterTransformationResponse[][] = await Promise.all( From d708309ef506ae4a26cdc102500bdcb22c7d0c59 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Wed, 3 Jul 2024 15:05:04 +0530 Subject: [PATCH 02/18] feat: add group by sourceID for rETL --- src/services/destination/cdkV2Integration.ts | 70 +++-- src/services/destination/nativeIntegration.ts | 7 +- src/v0/util/index.js | 25 +- src/v0/util/index.test.js | 247 +++++++++++++++++- 4 files changed, 302 insertions(+), 47 deletions(-) diff --git a/src/services/destination/cdkV2Integration.ts b/src/services/destination/cdkV2Integration.ts index 4873ace667..0789a98c1e 100644 --- a/src/services/destination/cdkV2Integration.ts +++ b/src/services/destination/cdkV2Integration.ts @@ -1,7 +1,6 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable class-methods-use-this */ import { TransformationError } from '@rudderstack/integrations-lib'; -import groupBy from 'lodash/groupBy'; import { processCdkV2Workflow } from '../../cdk/v2/handler'; import { DestinationService } from '../../interfaces/DestinationService'; import { @@ -20,6 +19,7 @@ import { } from '../../types/index'; import stats from '../../util/stats'; import { CatchErr } from '../../util/types'; +import { groupRouterTransformEvents } from '../../v0/util'; import tags from '../../v0/util/tags'; import { DestinationPostTransformationService } from './postTransformation'; @@ -112,46 +112,38 @@ export class CDKV2DestinationService implements DestinationService { _version: string, requestMetadata: NonNullable, ): Promise { - const allDestEvents: object = groupBy(events, (ev: RouterTransformationRequestData) => [ - ev.destination?.ID, - ev.metadata?.sourceJobRunId, - ]); + const groupedEvents: RouterTransformationRequestData[][] = groupRouterTransformEvents(events); const response: RouterTransformationResponse[][] = await Promise.all( - Object.values(allDestEvents).map( - async (destInputArray: RouterTransformationRequestData[]) => { - const metaTo = this.getTags( - destinationType, - destInputArray[0].metadata.destinationId, - destInputArray[0].metadata.workspaceId, - tags.FEATURES.ROUTER, - ); - metaTo.metadata = destInputArray[0].metadata; - try { - const doRouterTransformationResponse: RouterTransformationResponse[] = - await processCdkV2Workflow( - destinationType, - destInputArray, - tags.FEATURES.ROUTER, - requestMetadata, - ); - return DestinationPostTransformationService.handleRouterTransformSuccessEvents( - doRouterTransformationResponse, - undefined, - metaTo, - tags.IMPLEMENTATIONS.CDK_V2, - destinationType.toUpperCase(), + groupedEvents.map(async (destInputArray: RouterTransformationRequestData[]) => { + const metaTo = this.getTags( + destinationType, + destInputArray[0].metadata.destinationId, + destInputArray[0].metadata.workspaceId, + tags.FEATURES.ROUTER, + ); + metaTo.metadata = destInputArray[0].metadata; + try { + const doRouterTransformationResponse: RouterTransformationResponse[] = + await processCdkV2Workflow( + destinationType, + destInputArray, + tags.FEATURES.ROUTER, + requestMetadata, ); - } catch (error: CatchErr) { - metaTo.metadatas = destInputArray.map((input) => input.metadata); - const erroredResp = - DestinationPostTransformationService.handleRouterTransformFailureEvents( - error, - metaTo, - ); - return [erroredResp]; - } - }, - ), + return DestinationPostTransformationService.handleRouterTransformSuccessEvents( + doRouterTransformationResponse, + undefined, + metaTo, + tags.IMPLEMENTATIONS.CDK_V2, + destinationType.toUpperCase(), + ); + } catch (error: CatchErr) { + metaTo.metadatas = destInputArray.map((input) => input.metadata); + const erroredResp = + DestinationPostTransformationService.handleRouterTransformFailureEvents(error, metaTo); + return [erroredResp]; + } + }), ); return response.flat(); } diff --git a/src/services/destination/nativeIntegration.ts b/src/services/destination/nativeIntegration.ts index bb308d0b41..38ec934ff7 100644 --- a/src/services/destination/nativeIntegration.ts +++ b/src/services/destination/nativeIntegration.ts @@ -26,6 +26,7 @@ import { import stats from '../../util/stats'; import tags from '../../v0/util/tags'; import { DestinationPostTransformationService } from './postTransformation'; +import { groupRouterTransformEvents } from '../../v0/util'; export class NativeIntegrationDestinationService implements DestinationService { public init() {} @@ -99,11 +100,7 @@ export class NativeIntegrationDestinationService implements DestinationService { requestMetadata: NonNullable, ): Promise { const destHandler = FetchHandler.getDestHandler(destinationType, version); - const allDestEvents: NonNullable = groupBy( - events, - (ev: RouterTransformationRequestData) => [ev.destination?.ID, ev.metadata?.sourceJobRunId], - ); - const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents); + const groupedEvents: RouterTransformationRequestData[][] = groupRouterTransformEvents(events); const response: RouterTransformationResponse[][] = await Promise.all( groupedEvents.map(async (destInputArray: RouterTransformationRequestData[]) => { const metaTO = this.getTags( diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 12b8d4dd7e..da24ebfcb8 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -2248,9 +2248,29 @@ const validateEventAndLowerCaseConversion = (event, isMandatory, convertToLowerC return convertToLowerCase ? event.toString().toLowerCase() : event.toString(); }; -const applyCustomMappings = (message, mappings) => - JsonTemplateEngine.createAsSync(mappings, { defaultPathType: PathType.JSON }).evaluate(message); +/** + * This function applies custom mappings to the event. + * @param {*} event The event to be transformed. + * @param {*} mappings The custom mappings to be applied. + * @returns {object} The transformed event. + */ +const applyCustomMappings = (event, mappings) => + JsonTemplateEngine.createAsSync(mappings, { defaultPathType: PathType.JSON }).evaluate(event); +/** + * This groups the events by destination ID and source ID. + * Note: sourceID is only used for rETL events. + * @param {*} events The events to be grouped. + * @returns {array} The array of grouped events. + */ +const groupRouterTransformEvents = (events) => { + return Object.values( + lodash.groupBy(events, (ev) => [ + ev.metadata?.destinationId, + ev.metadata?.sourceCategory === 'warehouse' ? ev.metadata?.sourceId : 'default', + ]), + ); +}; // ======================================================================== // EXPORTS // ======================================================================== @@ -2309,6 +2329,7 @@ module.exports = { getValueFromMessage, getValueFromPropertiesOrTraits, getValuesAsArrayFromConfig, + groupRouterTransformEvents, handleSourceKeysOperation, hashToSha256, isAppleFamily, diff --git a/src/v0/util/index.test.js b/src/v0/util/index.test.js index c34d513325..369cb9f858 100644 --- a/src/v0/util/index.test.js +++ b/src/v0/util/index.test.js @@ -1,4 +1,4 @@ -const { TAG_NAMES, InstrumentationError } = require('@rudderstack/integrations-lib'); +const { InstrumentationError } = require('@rudderstack/integrations-lib'); const utilities = require('.'); const { getFuncTestData } = require('../../../test/testHelper'); const { FilteredEventsError } = require('./errorTypes'); @@ -8,6 +8,7 @@ const { generateExclusionList, combineBatchRequestsWithSameJobIds, validateEventAndLowerCaseConversion, + groupRouterTransformEvents, } = require('./index'); // Names of the utility functions to test @@ -690,3 +691,247 @@ describe('extractCustomFields', () => { }); }); }); + +describe('groupRouterTransformEvents', () => { + // groups events correctly by destination ID + it('should group events correctly by destination ID and source ID', () => { + const events = [ + { + message: { + eventID: 'evt001', + eventName: 'OrderReceived', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST001', + sourceCategory: 'warehouse', + }, + }, + { + message: { + eventID: 'evt002', + eventName: 'InventoryUpdate', + }, + metadata: { + sourceId: 'SRC002', + destinationId: 'DEST002', + sourceCategory: 'cloud', + }, + }, + { + message: { + eventID: 'evt003', + eventName: 'UserLogin', + }, + metadata: { + sourceId: 'SRC003', + destinationId: 'DEST003', + sourceCategory: 'webhook', + }, + }, + { + message: { + eventID: 'evt004', + eventName: 'PaymentProcessed', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST002', + sourceCategory: 'warehouse', + }, + }, + { + message: { + eventID: 'evt005', + eventName: 'ShipmentDispatched', + }, + metadata: { + sourceId: 'SRC002', + destinationId: 'DEST003', + sourceCategory: 'cloud', + }, + }, + { + message: { + eventID: 'evt006', + eventName: 'ProductReturn', + }, + metadata: { + sourceId: 'SRC003', + destinationId: 'DEST001', + sourceCategory: 'webhook', + }, + }, + { + message: { + eventID: 'evt007', + eventName: 'StockAlert', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST003', + sourceCategory: 'warehouse', + }, + }, + { + message: { + eventID: 'evt008', + eventName: 'UserRegistration', + }, + metadata: { + sourceId: 'SRC002', + destinationId: 'DEST001', + sourceCategory: 'cloud', + }, + }, + { + message: { + eventID: 'evt009', + eventName: 'ReviewSubmitted', + }, + metadata: { + sourceId: 'SRC003', + destinationId: 'DEST002', + sourceCategory: 'webhook', + }, + }, + { + message: { + eventID: 'evt010', + eventName: 'DiscountApplied', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST001', + sourceCategory: 'warehouse', + }, + }, + ]; + + const groupedEvents = groupRouterTransformEvents(events); + expect(groupedEvents).toEqual([ + [ + { + message: { + eventID: 'evt001', + eventName: 'OrderReceived', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST001', + sourceCategory: 'warehouse', + }, + }, + { + message: { + eventID: 'evt010', + eventName: 'DiscountApplied', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST001', + sourceCategory: 'warehouse', + }, + }, + ], + [ + { + message: { + eventID: 'evt002', + eventName: 'InventoryUpdate', + }, + metadata: { + sourceId: 'SRC002', + destinationId: 'DEST002', + sourceCategory: 'cloud', + }, + }, + { + message: { + eventID: 'evt009', + eventName: 'ReviewSubmitted', + }, + metadata: { + sourceId: 'SRC003', + destinationId: 'DEST002', + sourceCategory: 'webhook', + }, + }, + ], + [ + { + message: { + eventID: 'evt003', + eventName: 'UserLogin', + }, + metadata: { + sourceId: 'SRC003', + destinationId: 'DEST003', + sourceCategory: 'webhook', + }, + }, + { + message: { + eventID: 'evt005', + eventName: 'ShipmentDispatched', + }, + metadata: { + sourceId: 'SRC002', + destinationId: 'DEST003', + sourceCategory: 'cloud', + }, + }, + ], + [ + { + message: { + eventID: 'evt004', + eventName: 'PaymentProcessed', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST002', + sourceCategory: 'warehouse', + }, + }, + ], + [ + { + message: { + eventID: 'evt006', + eventName: 'ProductReturn', + }, + metadata: { + sourceId: 'SRC003', + destinationId: 'DEST001', + sourceCategory: 'webhook', + }, + }, + { + message: { + eventID: 'evt008', + eventName: 'UserRegistration', + }, + metadata: { + sourceId: 'SRC002', + destinationId: 'DEST001', + sourceCategory: 'cloud', + }, + }, + ], + [ + { + message: { + eventID: 'evt007', + eventName: 'StockAlert', + }, + metadata: { + sourceId: 'SRC001', + destinationId: 'DEST003', + sourceCategory: 'warehouse', + }, + }, + ], + ]); + }); +}); From a1bc2c1fbbaa2ad9363d9a5eb03de724ddee3962 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Wed, 3 Jul 2024 15:10:15 +0530 Subject: [PATCH 03/18] fix: lint isues --- src/v0/util/index.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/v0/util/index.js b/src/v0/util/index.js index da24ebfcb8..6445ed55a1 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -2263,14 +2263,12 @@ const applyCustomMappings = (event, mappings) => * @param {*} events The events to be grouped. * @returns {array} The array of grouped events. */ -const groupRouterTransformEvents = (events) => { - return Object.values( +const groupRouterTransformEvents = (events) => Object.values( lodash.groupBy(events, (ev) => [ ev.metadata?.destinationId, ev.metadata?.sourceCategory === 'warehouse' ? ev.metadata?.sourceId : 'default', ]), ); -}; // ======================================================================== // EXPORTS // ======================================================================== From 38de49aaadf90221a6ceb4ff29211c7c516131e4 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Wed, 10 Jul 2024 15:55:14 +0530 Subject: [PATCH 04/18] fix: lint issues --- src/v0/util/index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 6445ed55a1..f683dd80bc 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -2263,7 +2263,8 @@ const applyCustomMappings = (event, mappings) => * @param {*} events The events to be grouped. * @returns {array} The array of grouped events. */ -const groupRouterTransformEvents = (events) => Object.values( +const groupRouterTransformEvents = (events) => + Object.values( lodash.groupBy(events, (ev) => [ ev.metadata?.destinationId, ev.metadata?.sourceCategory === 'warehouse' ? ev.metadata?.sourceId : 'default', From 7e258d535f06191dac9a34154bff1f8f2ba0e99c Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Wed, 10 Jul 2024 16:23:33 +0530 Subject: [PATCH 05/18] fix: component test cases --- test/integrations/destinations/algolia/router/data.ts | 8 ++++---- test/integrations/destinations/iterable/router/data.ts | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integrations/destinations/algolia/router/data.ts b/test/integrations/destinations/algolia/router/data.ts index dca899693e..4aadd922a2 100644 --- a/test/integrations/destinations/algolia/router/data.ts +++ b/test/integrations/destinations/algolia/router/data.ts @@ -75,7 +75,7 @@ export const data = [ integrations: { All: true }, originalTimestamp: '2021-10-25T09:40:08.879Z', }, - metadata: { jobId: 1, userId: 'u1' }, + metadata: { jobId: 1, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, destination: { ID: '1zzHtStW2ZPlullmz6L7DGnmk9V', Name: 'algolia-dev', @@ -178,7 +178,7 @@ export const data = [ integrations: { All: true }, originalTimestamp: '2021-10-25T09:40:08.886Z', }, - metadata: { jobId: 2, userId: 'u1' }, + metadata: { jobId: 2, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, destination: { ID: '1zzHtStW2ZPlullmz6L7DGnmk9V', Name: 'algolia-dev', @@ -317,8 +317,8 @@ export const data = [ files: {}, }, metadata: [ - { jobId: 1, userId: 'u1' }, - { jobId: 2, userId: 'u1' }, + { jobId: 1, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, + { jobId: 2, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, ], batched: true, statusCode: 200, diff --git a/test/integrations/destinations/iterable/router/data.ts b/test/integrations/destinations/iterable/router/data.ts index 09eedc8eb8..ba186c048e 100644 --- a/test/integrations/destinations/iterable/router/data.ts +++ b/test/integrations/destinations/iterable/router/data.ts @@ -296,7 +296,7 @@ export const data = [ type: 'identify', userId: 'lynnanderson@smith.net', }, - metadata: { jobId: 5, userId: 'u1' }, + metadata: { jobId: 5, userId: 'u1', destinationId: '1zia9wKshXt80YksLmUdJnr7IHI' }, destination: { ID: '1zia9wKshXt80YksLmUdJnr7IHI', Name: 'test_iterable', @@ -807,7 +807,7 @@ export const data = [ }, files: {}, }, - metadata: [{ jobId: 5, userId: 'u1' }], + metadata: [{ jobId: 5, userId: 'u1', destinationId: '1zia9wKshXt80YksLmUdJnr7IHI' }], batched: true, statusCode: 200, destination: { From d09ca5cfd234d59fd28d4c279ae12ef844d0b61e Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Wed, 21 Aug 2024 15:27:50 +0530 Subject: [PATCH 06/18] feat: update FBCA spec to support VDM v2 events --- .../fb_custom_audience/recordTransform.js | 15 ++++----------- .../destinations/fb_custom_audience/transform.js | 7 ++++--- src/v0/destinations/fb_custom_audience/util.js | 7 +++---- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index 62d4bd568b..554b10c46b 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -100,18 +100,11 @@ const processRecordEventArray = ( }; async function processRecordInputs(groupedRecordInputs) { - const { destination } = groupedRecordInputs[0]; + const { destination, connection } = groupedRecordInputs[0]; const { message } = groupedRecordInputs[0]; - const { - isHashRequired, - accessToken, - disableFormat, - type, - subType, - isRaw, - maxUserCount, - audienceId, - } = destination.Config; + const { isHashRequired, accessToken, disableFormat, type, subType, isRaw, maxUserCount } = + destination.Config; + const { audienceId } = connection.Config.destination; const prepareParams = { access_token: accessToken, }; diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index c5c340c043..f15524fc7e 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -154,11 +154,12 @@ const prepareToSendEvents = ( }); return toSendEvents; }; -const processEvent = (message, destination) => { +const processEvent = (message, destination, connection) => { const respList = []; let toSendEvents = []; let { userSchema } = destination.Config; - const { isHashRequired, audienceId, maxUserCount } = destination.Config; + const { isHashRequired, maxUserCount } = destination.Config; + const { audienceId } = connection.Config.destination; if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } @@ -236,7 +237,7 @@ const processEvent = (message, destination) => { return respList; }; -const process = (event) => processEvent(event.message, event.destination); +const process = (event) => processEvent(event.message, event.destination, event.connection); const processRouterDest = async (inputs, reqMetadata) => { const respList = []; diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index 401b601869..19f93d557a 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -49,14 +49,13 @@ const batchingWithPayloadSize = (payload) => { }; const getSchemaForEventMappedToDest = (message) => { - const mappedSchema = get(message, 'context.destinationFields'); + const mappedSchema = get(message, 'identifiers'); if (!mappedSchema) { throw new InstrumentationError( - 'context.destinationFields is required property for events mapped to destination ', + 'identifiers is required property for events mapped to destination ', ); } - // context.destinationFields has 2 possible values. An Array of fields or Comma seperated string with field names - let userSchema = Array.isArray(mappedSchema) ? mappedSchema : mappedSchema.split(','); + let userSchema = Object.keys(mappedSchema); userSchema = userSchema.map((field) => field.trim()); return userSchema; }; From cf6e879de4ba8654ae2f20a344d7d96f44b8adee Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Wed, 21 Aug 2024 18:32:02 +0530 Subject: [PATCH 07/18] feat: update FBCA spec to support VDM v2 events --- src/v0/destinations/fb_custom_audience/util.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index 19f93d557a..24650f03f4 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -49,7 +49,7 @@ const batchingWithPayloadSize = (payload) => { }; const getSchemaForEventMappedToDest = (message) => { - const mappedSchema = get(message, 'identifiers'); + const mappedSchema = get(message, 'fields'); if (!mappedSchema) { throw new InstrumentationError( 'identifiers is required property for events mapped to destination ', From 75dab29d06bc3c31d8e4df9a7392987d7a88599a Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Wed, 21 Aug 2024 20:45:11 +0530 Subject: [PATCH 08/18] chore: updated tests --- src/types/index.ts | 1 + .../fb_custom_audience/recordTransform.js | 2 +- .../fb_custom_audience/transform.js | 2 +- .../destinations/fb_custom_audience/util.js | 2 +- .../fb_custom_audience/processor/data.ts | 112 +++++++++++ .../fb_custom_audience/router/audienceList.ts | 5 +- .../fb_custom_audience/router/data.ts | 189 ------------------ .../fb_custom_audience/router/eventStream.ts | 14 ++ 8 files changed, 134 insertions(+), 193 deletions(-) diff --git a/src/types/index.ts b/src/types/index.ts index 150758363e..c2dcfe4bdd 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -160,6 +160,7 @@ type RouterTransformationRequestData = { message: object; metadata: Metadata; destination: Destination; + connection?: Record; }; type RouterTransformationRequest = { diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index 554b10c46b..46a9134175 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -104,7 +104,7 @@ async function processRecordInputs(groupedRecordInputs) { const { message } = groupedRecordInputs[0]; const { isHashRequired, accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config; - const { audienceId } = connection.Config.destination; + const audienceId = get(connection, 'Config.destination.audienceId'); const prepareParams = { access_token: accessToken, }; diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index f15524fc7e..f8bf8b4fbb 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -159,7 +159,7 @@ const processEvent = (message, destination, connection) => { let toSendEvents = []; let { userSchema } = destination.Config; const { isHashRequired, maxUserCount } = destination.Config; - const { audienceId } = connection.Config.destination; + const audienceId = get(connection, 'Config.destination.audienceId'); if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index 24650f03f4..569e3d888b 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -52,7 +52,7 @@ const getSchemaForEventMappedToDest = (message) => { const mappedSchema = get(message, 'fields'); if (!mappedSchema) { throw new InstrumentationError( - 'identifiers is required property for events mapped to destination ', + 'event.fields is required property for events mapped to destination ', ); } let userSchema = Object.keys(mappedSchema); diff --git a/test/integrations/destinations/fb_custom_audience/processor/data.ts b/test/integrations/destinations/fb_custom_audience/processor/data.ts index 75fa321aca..2956086646 100644 --- a/test/integrations/destinations/fb_custom_audience/processor/data.ts +++ b/test/integrations/destinations/fb_custom_audience/processor/data.ts @@ -16,6 +16,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -128,6 +135,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -241,6 +255,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -354,6 +375,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: '', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -467,6 +495,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -678,6 +713,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -889,6 +931,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -1038,6 +1087,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -1251,6 +1307,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -1506,6 +1569,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -1717,6 +1787,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -1928,6 +2005,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -2071,6 +2155,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -2199,6 +2290,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -2341,6 +2439,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -54479,6 +54584,13 @@ export const data = [ request: { body: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', diff --git a/test/integrations/destinations/fb_custom_audience/router/audienceList.ts b/test/integrations/destinations/fb_custom_audience/router/audienceList.ts index c386fbf782..b636a59618 100644 --- a/test/integrations/destinations/fb_custom_audience/router/audienceList.ts +++ b/test/integrations/destinations/fb_custom_audience/router/audienceList.ts @@ -39,7 +39,10 @@ export const rETLAudienceRouterRequest: RouterTransformationRequest = { messageId: '4d906837-031d-4d34-b97a-62fdf51b4d3a', event: 'Add_Audience', context: { - destinationFields: 'EMAIL, FN', + fields: { + EMAIL: 'subscribed@eewrfrd.com', + FN: 'dfgdfg', + }, externalId: [{ type: 'FB_CUSTOM_AUDIENCE-23848494844100489', identifierType: 'EMAIL' }], mappedToDestination: 'true', sources: { diff --git a/test/integrations/destinations/fb_custom_audience/router/data.ts b/test/integrations/destinations/fb_custom_audience/router/data.ts index ab9bfee973..d0ba3f7b60 100644 --- a/test/integrations/destinations/fb_custom_audience/router/data.ts +++ b/test/integrations/destinations/fb_custom_audience/router/data.ts @@ -345,195 +345,6 @@ export const data = [ }, }, }, - { - name: 'fb_custom_audience', - description: 'rETL tests', - scenario: 'business', - successCriteria: 'it should transform audience event correctly', - feature: 'router', - module: 'destination', - version: 'v0', - input: { - request: { - body: rETLAudienceRouterRequest, - }, - }, - output: { - response: { - status: 200, - body: { - output: [ - { - batchedRequest: [ - { - version: '1', - type: 'REST', - method: 'POST', - endpoint: 'https://graph.facebook.com/v20.0/23848494844100489/users', - headers: {}, - params: { - access_token: 'ABC', - payload: { - schema: ['EMAIL', 'FN'], - data: [ - [ - '7625cab24612c37df6d2f724721bb38a25095d0295e29b807238ee188b8aca43', - 'e328a0d90d4b5132b2655cf7079b160040d2c1a83d70d4cad9cf1f69310635b3', - ], - [ - 'b2b4abadd72190af54305c0d3abf1977fec4935016bb13ff28040d5712318dfd', - 'f8147eb72c9bb356c362fdb0796b54971ebc983cb60b3cc3ff29582ce2052bad', - ], - [ - 'c4b007d1c3c9a5d31bd4082237a913e8e0db1767225c2a5ef33be2716df005fa', - 'd8bb13b95eaed7f9b6a8af276aa6122e8015e0c466c1a84e49ff7c69ad6ac911', - ], - [ - '94639be1bd9f17c05820164e9d71ef78558f117a9e8bfab43cf8015e08aa0b27', - 'b1661f97721dede0f876dcbf603289ee339f641b9c310deba53c76940f472698', - ], - [ - '39b456cfb4bb07f9e6bb18698aa173171ca49c731fccc4790e9ecea808d24ae6', - '6c882abd6d0aff713cdd6a4a31ee28c9140612fb2627a611f6f9f539bac44f81', - ], - [ - '769f73387add781a481ca08300008a08fb2f1816aaed196137efc2e05976d711', - '2222cb73346f7a01a1d4d3db28b58fd41045782bb66152b92aade379192544c5', - ], - [ - '48ddb93f0b30c475423fe177832912c5bcdce3cc72872f8051627967ef278e08', - 'abc12f8d666517c35280bf220f5390b1f0ef4bdbbc794ac59c95bba0381bf91b', - ], - [ - 'da2d431121cd10578fd81f8f80344b06db59ea2d05a7b5d27536c8789ddae8f0', - 'abc12f8d666517c35280bf220f5390b1f0ef4bdbbc794ac59c95bba0381bf91b', - ], - [ - 'b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36', - '62a2fed3d6e08c44835fce71f02210b1ddabfb066e39edf1e6c261988f824dd3', - ], - [ - '0c1d1b0ba547a742013366d6fbc8f71dd77f566d94e41ed9f828a74b96928161', - '62a2fed3d6e08c44835fce71f02210b1ddabfb066e39edf1e6c261988f824dd3', - ], - ], - }, - }, - body: { - JSON: {}, - JSON_ARRAY: {}, - XML: {}, - FORM: {}, - }, - files: {}, - }, - ], - metadata: [ - { - attemptNum: 1, - destinationId: 'default-destinationId', - dontBatch: false, - jobId: 3, - secret: { - accessToken: 'default-accessToken', - }, - sourceId: 'default-sourceId', - userId: 'default-userId', - workspaceId: 'default-workspaceId', - }, - ], - batched: false, - statusCode: 200, - destination: { - Config: { - accessToken: 'ABC', - disableFormat: false, - isHashRequired: true, - isRaw: false, - maxUserCount: '50', - oneTrustCookieCategories: [], - skipVerify: false, - subType: 'NA', - type: 'NA', - userSchema: ['EMAIL'], - }, - ID: '1mMy5cqbtfuaKZv1IhVQKnBdVwe', - Name: 'FB_CUSTOM_AUDIENCE', - Enabled: true, - WorkspaceID: '1TSN08muJTZwH8iCDmnnRt1pmLd', - DestinationDefinition: { - ID: '1aIXqM806xAVm92nx07YwKbRrO9', - Name: 'FB_CUSTOM_AUDIENCE', - DisplayName: 'FB_CUSTOM_AUDIENCE', - Config: {}, - }, - Transformations: [], - IsConnectionEnabled: true, - IsProcessorEnabled: true, - }, - }, - { - metadata: [ - { - attemptNum: 1, - destinationId: 'default-destinationId', - dontBatch: false, - jobId: 4, - secret: { - accessToken: 'default-accessToken', - }, - sourceId: 'default-sourceId', - userId: 'default-userId', - workspaceId: 'default-workspaceId', - }, - ], - batched: false, - statusCode: 400, - error: - 'context.destinationFields is required property for events mapped to destination ', - statTags: { - errorCategory: 'dataValidation', - errorType: 'instrumentation', - destType: 'FB_CUSTOM_AUDIENCE', - destinationId: 'default-destinationId', - module: 'destination', - implementation: 'native', - feature: 'router', - workspaceId: 'default-workspaceId', - }, - destination: { - Config: { - accessToken: 'ABC', - disableFormat: false, - isHashRequired: true, - isRaw: false, - maxUserCount: '50', - oneTrustCookieCategories: [], - skipVerify: false, - subType: 'NA', - type: 'NA', - userSchema: ['EMAIL'], - }, - ID: '1mMy5cqbtfuaKZv1IhVQKnBdVwe', - Name: 'FB_CUSTOM_AUDIENCE', - Enabled: true, - WorkspaceID: '1TSN08muJTZwH8iCDmnnRt1pmLd', - DestinationDefinition: { - ID: '1aIXqM806xAVm92nx07YwKbRrO9', - Name: 'FB_CUSTOM_AUDIENCE', - DisplayName: 'FB_CUSTOM_AUDIENCE', - Config: {}, - }, - Transformations: [], - IsConnectionEnabled: true, - IsProcessorEnabled: true, - }, - }, - ], - }, - }, - }, - }, { name: 'fb_custom_audience', description: 'rETL record tests', diff --git a/test/integrations/destinations/fb_custom_audience/router/eventStream.ts b/test/integrations/destinations/fb_custom_audience/router/eventStream.ts index b4dcebf48b..ddb32cf9ae 100644 --- a/test/integrations/destinations/fb_custom_audience/router/eventStream.ts +++ b/test/integrations/destinations/fb_custom_audience/router/eventStream.ts @@ -42,6 +42,13 @@ const destination: Destination = { export const eventStreamRouterRequest: RouterTransformationRequest = { input: [ { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', @@ -88,6 +95,13 @@ export const eventStreamRouterRequest: RouterTransformationRequest = { destination: destination, }, { + connection: { + Config: { + destination: { + audienceId: 'aud1', + }, + }, + }, message: { userId: 'user 1', anonymousId: 'anon-id-new', From 018bf232b7b7b20b7c56c6ab512d8a904a0027df Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Thu, 22 Aug 2024 11:41:28 +0530 Subject: [PATCH 09/18] chore: added logs --- src/v0/destinations/fb_custom_audience/recordTransform.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index 46a9134175..f115c49346 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -21,6 +21,7 @@ const { responseBuilderSimple, getDataSource, } = require('./util'); +const logger = require('../../../logger'); const processRecordEventArray = ( recordChunksArray, @@ -116,6 +117,8 @@ async function processRecordInputs(groupedRecordInputs) { } // audience id validation + logger.debug('Connection_object', JSON.stringify(connection)); + logger.debug('Event_object', JSON.stringify(message)); let operationAudienceId = audienceId; const mappedToDestination = get(message, MappedToDestinationKey); if (mappedToDestination) { From 13c54693c4ce5d5379cdea5e83cda20d79e136d0 Mon Sep 17 00:00:00 2001 From: yeshwanth vuppu <16932584+vyeshwanth@users.noreply.github.com> Date: Thu, 22 Aug 2024 14:40:39 +0530 Subject: [PATCH 10/18] fix: audienceId not found error --- src/v0/destinations/fb_custom_audience/recordTransform.js | 7 ++----- src/v0/destinations/fb_custom_audience/transform.js | 3 +-- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index f115c49346..b771f2d7a2 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -21,7 +21,6 @@ const { responseBuilderSimple, getDataSource, } = require('./util'); -const logger = require('../../../logger'); const processRecordEventArray = ( recordChunksArray, @@ -105,7 +104,7 @@ async function processRecordInputs(groupedRecordInputs) { const { message } = groupedRecordInputs[0]; const { isHashRequired, accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config; - const audienceId = get(connection, 'Config.destination.audienceId'); + const audienceId = get(connection, 'config.destination.audienceId'); const prepareParams = { access_token: accessToken, }; @@ -117,11 +116,9 @@ async function processRecordInputs(groupedRecordInputs) { } // audience id validation - logger.debug('Connection_object', JSON.stringify(connection)); - logger.debug('Event_object', JSON.stringify(message)); let operationAudienceId = audienceId; const mappedToDestination = get(message, MappedToDestinationKey); - if (mappedToDestination) { + if (mappedToDestination && !operationAudienceId) { const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE'); operationAudienceId = objectType; } diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index f8bf8b4fbb..26180cb2f9 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -159,7 +159,7 @@ const processEvent = (message, destination, connection) => { let toSendEvents = []; let { userSchema } = destination.Config; const { isHashRequired, maxUserCount } = destination.Config; - const audienceId = get(connection, 'Config.destination.audienceId'); + const audienceId = get(connection, 'config.destination.audienceId'); if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } @@ -248,7 +248,6 @@ const processRouterDest = async (inputs, reqMetadata) => { const eventTypes = ['record', 'audiencelist']; const unsupportedEventList = checkForUnsupportedEventTypes(groupedInputs, eventTypes); if (unsupportedEventList.length > 0) { - logger.info(`unsupported events found ${unsupportedEventList}`); throw new ConfigurationError('unsupported events present in the event'); } From b6fd84ca14bc9c9e996791ceb6b2c9c2093838e7 Mon Sep 17 00:00:00 2001 From: yeshwanth vuppu <16932584+vyeshwanth@users.noreply.github.com> Date: Thu, 22 Aug 2024 15:54:04 +0530 Subject: [PATCH 11/18] fix: tests --- .../fb_custom_audience/processor/data.ts | 26 +++++++++---------- .../fb_custom_audience/router/eventStream.ts | 4 +-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/test/integrations/destinations/fb_custom_audience/processor/data.ts b/test/integrations/destinations/fb_custom_audience/processor/data.ts index 2956086646..7fff17fe06 100644 --- a/test/integrations/destinations/fb_custom_audience/processor/data.ts +++ b/test/integrations/destinations/fb_custom_audience/processor/data.ts @@ -17,7 +17,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -496,7 +496,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -714,7 +714,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -932,7 +932,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -1088,7 +1088,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -1308,7 +1308,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -1570,7 +1570,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -1788,7 +1788,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -2006,7 +2006,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -2156,7 +2156,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -2291,7 +2291,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -2440,7 +2440,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -54585,7 +54585,7 @@ export const data = [ body: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, diff --git a/test/integrations/destinations/fb_custom_audience/router/eventStream.ts b/test/integrations/destinations/fb_custom_audience/router/eventStream.ts index ddb32cf9ae..269283ef61 100644 --- a/test/integrations/destinations/fb_custom_audience/router/eventStream.ts +++ b/test/integrations/destinations/fb_custom_audience/router/eventStream.ts @@ -43,7 +43,7 @@ export const eventStreamRouterRequest: RouterTransformationRequest = { input: [ { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, @@ -96,7 +96,7 @@ export const eventStreamRouterRequest: RouterTransformationRequest = { }, { connection: { - Config: { + config: { destination: { audienceId: 'aud1', }, From c1cde0f6d1073af73d443ed814cad658eaba4fc3 Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Mon, 26 Aug 2024 20:27:52 +0530 Subject: [PATCH 12/18] feat: fetch isHashRequired from connection configuration --- .../fb_custom_audience/recordTransform.js | 4 +- .../router/batchingRecord.ts | 21 ++++++++ .../fb_custom_audience/router/record.ts | 49 +++++++++++++++++++ 3 files changed, 72 insertions(+), 2 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index b771f2d7a2..48d5e665f7 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -102,9 +102,9 @@ const processRecordEventArray = ( async function processRecordInputs(groupedRecordInputs) { const { destination, connection } = groupedRecordInputs[0]; const { message } = groupedRecordInputs[0]; - const { isHashRequired, accessToken, disableFormat, type, subType, isRaw, maxUserCount } = - destination.Config; + const { accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config; const audienceId = get(connection, 'config.destination.audienceId'); + const isHashRequired = get(connection, 'config.destination.isHashRequired'); const prepareParams = { access_token: accessToken, }; diff --git a/test/integrations/destinations/fb_custom_audience/router/batchingRecord.ts b/test/integrations/destinations/fb_custom_audience/router/batchingRecord.ts index 0ceff5260e..e716e36d12 100644 --- a/test/integrations/destinations/fb_custom_audience/router/batchingRecord.ts +++ b/test/integrations/destinations/fb_custom_audience/router/batchingRecord.ts @@ -32,6 +32,13 @@ const destination: Destination = { export const rETLBatchingRouterRequest: RouterTransformationRequest = { input: [ { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'insert', @@ -62,6 +69,13 @@ export const rETLBatchingRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(1), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'insert', @@ -92,6 +106,13 @@ export const rETLBatchingRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(2), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'insert', diff --git a/test/integrations/destinations/fb_custom_audience/router/record.ts b/test/integrations/destinations/fb_custom_audience/router/record.ts index 534c1c40c2..f6dcf37a74 100644 --- a/test/integrations/destinations/fb_custom_audience/router/record.ts +++ b/test/integrations/destinations/fb_custom_audience/router/record.ts @@ -32,6 +32,13 @@ const destination: Destination = { export const rETLRecordRouterRequest: RouterTransformationRequest = { input: [ { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'insert', @@ -62,6 +69,13 @@ export const rETLRecordRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(3), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'update', @@ -92,6 +106,13 @@ export const rETLRecordRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(4), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'delete', @@ -122,6 +143,13 @@ export const rETLRecordRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(1), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'delete', @@ -152,6 +180,13 @@ export const rETLRecordRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(2), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'update', @@ -182,6 +217,13 @@ export const rETLRecordRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(5), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'update', @@ -212,6 +254,13 @@ export const rETLRecordRouterRequest: RouterTransformationRequest = { metadata: generateMetadata(6), }, { + connection: { + config: { + destination: { + isHashRequired: true, + }, + }, + }, destination: destination, message: { action: 'lol', From 42fbce5ca1a030324e803867d1b6665a09856cbd Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Thu, 29 Aug 2024 16:11:22 +0530 Subject: [PATCH 13/18] chore: addressed review comments --- src/v0/destinations/fb_custom_audience/recordTransform.js | 4 ++-- src/v0/destinations/fb_custom_audience/transform.js | 1 - src/v0/destinations/fb_custom_audience/util.js | 3 +-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index 48d5e665f7..343e437b32 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -103,8 +103,8 @@ async function processRecordInputs(groupedRecordInputs) { const { destination, connection } = groupedRecordInputs[0]; const { message } = groupedRecordInputs[0]; const { accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config; - const audienceId = get(connection, 'config.destination.audienceId'); - const isHashRequired = get(connection, 'config.destination.isHashRequired'); + const audienceId = connection?.config?.destination?.audienceId; + const isHashRequired = connection?.config?.destination?.isHashRequired; const prepareParams = { access_token: accessToken, }; diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index 26180cb2f9..dd371062b3 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -21,7 +21,6 @@ const { schemaFields, USER_ADD, USER_DELETE } = require('./config'); const { MappedToDestinationKey } = require('../../../constants'); const { processRecordInputs } = require('./recordTransform'); -const logger = require('../../../logger'); function checkForUnsupportedEventTypes(dictionary, keyList) { const unsupportedEventTypes = []; diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index 569e3d888b..53a0344e0b 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -1,7 +1,6 @@ const lodash = require('lodash'); const sha256 = require('sha256'); const crypto = require('crypto'); -const get = require('get-value'); const jsonSize = require('json-size'); const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); const { TransformationError } = require('@rudderstack/integrations-lib'); @@ -49,7 +48,7 @@ const batchingWithPayloadSize = (payload) => { }; const getSchemaForEventMappedToDest = (message) => { - const mappedSchema = get(message, 'fields'); + const mappedSchema = message.fields; if (!mappedSchema) { throw new InstrumentationError( 'event.fields is required property for events mapped to destination ', From 9589ab7c6f2539c6cbec62dde0d002d7c99f2c4a Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Thu, 5 Sep 2024 18:51:14 +0530 Subject: [PATCH 14/18] feat: added backward compatability to support both old and new VDM --- .../destinations/fb_custom_audience/config.js | 3 +++ .../fb_custom_audience/recordTransform.js | 20 ++++++++++++++---- .../fb_custom_audience/transform.js | 21 +++++++++++++++---- .../destinations/fb_custom_audience/util.js | 15 +++++++++++++ 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/config.js b/src/v0/destinations/fb_custom_audience/config.js index 39ca2d04c4..809ba0b7d6 100644 --- a/src/v0/destinations/fb_custom_audience/config.js +++ b/src/v0/destinations/fb_custom_audience/config.js @@ -4,6 +4,8 @@ function getEndPoint(audienceId) { return `${BASE_URL}/${audienceId}/users`; } +const VDM_V2_SCHEMA_VERSION = '1.1'; + const schemaFields = [ 'EXTERN_ID', 'EMAIL', @@ -105,4 +107,5 @@ module.exports = { typeFields, subTypeFields, maxPayloadSize, + VDM_V2_SCHEMA_VERSION, }; diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js index 343e437b32..9b4e72f961 100644 --- a/src/v0/destinations/fb_custom_audience/recordTransform.js +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -2,7 +2,7 @@ const lodash = require('lodash'); const get = require('get-value'); const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); -const { schemaFields } = require('./config'); +const { schemaFields, VDM_V2_SCHEMA_VERSION } = require('./config'); const { MappedToDestinationKey } = require('../../../constants'); const stats = require('../../../util/stats'); const { @@ -17,6 +17,7 @@ const { ensureApplicableFormat, getUpdatedDataElement, getSchemaForEventMappedToDest, + getSchemaForEventMappedToDestForVDMv2, batchingWithPayloadSize, responseBuilderSimple, getDataSource, @@ -103,8 +104,15 @@ async function processRecordInputs(groupedRecordInputs) { const { destination, connection } = groupedRecordInputs[0]; const { message } = groupedRecordInputs[0]; const { accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config; - const audienceId = connection?.config?.destination?.audienceId; - const isHashRequired = connection?.config?.destination?.isHashRequired; + let audienceId; + let isHashRequired; + if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { + audienceId = connection?.config?.destination?.audienceId; + isHashRequired = connection?.config?.destination?.isHashRequired; + } else { + audienceId = destination.Config?.audienceId; + isHashRequired = destination.Config?.isHashRequired; + } const prepareParams = { access_token: accessToken, }; @@ -129,7 +137,11 @@ async function processRecordInputs(groupedRecordInputs) { // user schema validation let { userSchema } = destination.Config; if (mappedToDestination) { - userSchema = getSchemaForEventMappedToDest(message); + if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { + userSchema = getSchemaForEventMappedToDestForVDMv2(message); + } else { + userSchema = getSchemaForEventMappedToDest(message); + } } if (!Array.isArray(userSchema)) { userSchema = [userSchema]; diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index dd371062b3..3a1293c43e 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -12,12 +12,13 @@ const { const { prepareDataField, getSchemaForEventMappedToDest, + getSchemaForEventMappedToDestForVDMv2, batchingWithPayloadSize, generateAppSecretProof, responseBuilderSimple, getDataSource, } = require('./util'); -const { schemaFields, USER_ADD, USER_DELETE } = require('./config'); +const { schemaFields, USER_ADD, USER_DELETE, VDM_V2_SCHEMA_VERSION } = require('./config'); const { MappedToDestinationKey } = require('../../../constants'); const { processRecordInputs } = require('./recordTransform'); @@ -157,8 +158,16 @@ const processEvent = (message, destination, connection) => { const respList = []; let toSendEvents = []; let { userSchema } = destination.Config; - const { isHashRequired, maxUserCount } = destination.Config; - const audienceId = get(connection, 'config.destination.audienceId'); + const { maxUserCount } = destination.Config; + let audienceId; + let isHashRequired; + if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { + audienceId = connection?.config?.destination?.audienceId; + isHashRequired = connection?.config?.destination?.isHashRequired; + } else { + audienceId = destination.Config?.audienceId; + isHashRequired = destination.Config?.isHashRequired; + } if (!message.type) { throw new InstrumentationError('Message Type is not present. Aborting message.'); } @@ -182,7 +191,11 @@ const processEvent = (message, destination, connection) => { // If mapped to destination, use the mapped fields instead of destination userschema if (mappedToDestination) { - userSchema = getSchemaForEventMappedToDest(message); + if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { + userSchema = getSchemaForEventMappedToDestForVDMv2(message); + } else { + userSchema = getSchemaForEventMappedToDest(message); + } } // When one single schema field is added in the webapp, it does not appear to be an array diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index 53a0344e0b..fbe2f39004 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -1,6 +1,7 @@ const lodash = require('lodash'); const sha256 = require('sha256'); const crypto = require('crypto'); +const get = require('get-value'); const jsonSize = require('json-size'); const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); const { TransformationError } = require('@rudderstack/integrations-lib'); @@ -48,6 +49,19 @@ const batchingWithPayloadSize = (payload) => { }; const getSchemaForEventMappedToDest = (message) => { + const mappedSchema = get(message, 'context.destinationFields'); + if (!mappedSchema) { + throw new InstrumentationError( + 'context.destinationFields is required property for events mapped to destination ', + ); + } + // context.destinationFields has 2 possible values. An Array of fields or Comma seperated string with field names + let userSchema = Array.isArray(mappedSchema) ? mappedSchema : mappedSchema.split(','); + userSchema = userSchema.map((field) => field.trim()); + return userSchema; +}; + +const getSchemaForEventMappedToDestForVDMv2 = (message) => { const mappedSchema = message.fields; if (!mappedSchema) { throw new InstrumentationError( @@ -259,6 +273,7 @@ const responseBuilderSimple = (payload, audienceId) => { module.exports = { prepareDataField, getSchemaForEventMappedToDest, + getSchemaForEventMappedToDestForVDMv2, batchingWithPayloadSize, ensureApplicableFormat, getUpdatedDataElement, From a5dfc8f35bbe067223449255949ba8063605ada3 Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Fri, 6 Sep 2024 13:49:56 +0530 Subject: [PATCH 15/18] chore: updated mapped schema parameter --- src/v0/destinations/fb_custom_audience/util.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index fbe2f39004..84dcdf52dc 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -62,10 +62,10 @@ const getSchemaForEventMappedToDest = (message) => { }; const getSchemaForEventMappedToDestForVDMv2 = (message) => { - const mappedSchema = message.fields; + const mappedSchema = message?.identifierMappings; if (!mappedSchema) { throw new InstrumentationError( - 'event.fields is required property for events mapped to destination ', + 'event.identifierMappings is required property for events mapped to destination ', ); } let userSchema = Object.keys(mappedSchema); From 9782db9b99d80558522ecab3d0bc3484d957e182 Mon Sep 17 00:00:00 2001 From: sandeepdigumarty Date: Tue, 10 Sep 2024 16:41:19 +0530 Subject: [PATCH 16/18] chore: addressed review comments --- src/v0/destinations/fb_custom_audience/transform.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index 3a1293c43e..f920ebe398 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -154,7 +154,8 @@ const prepareToSendEvents = ( }); return toSendEvents; }; -const processEvent = (message, destination, connection) => { +const processEvent = (event) => { + const { message, destination, connection } = event; const respList = []; let toSendEvents = []; let { userSchema } = destination.Config; @@ -249,7 +250,7 @@ const processEvent = (message, destination, connection) => { return respList; }; -const process = (event) => processEvent(event.message, event.destination, event.connection); +const process = (event) => processEvent(event); const processRouterDest = async (inputs, reqMetadata) => { const respList = []; From 344b336ad36c05eaae4e109daaab4056239a487c Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Wed, 11 Sep 2024 10:36:22 +0530 Subject: [PATCH 17/18] chore: add fallback to destination and source ids --- src/v0/util/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 8f8aa0d551..2778a37d26 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -2304,7 +2304,7 @@ const applyJSONStringTemplate = (message, template) => const groupRouterTransformEvents = (events) => Object.values( lodash.groupBy(events, (ev) => [ - ev.metadata?.destinationId, + ev.metadata?.destinationId || ev.destination?.ID, ev.metadata?.sourceCategory === 'warehouse' ? ev.metadata?.sourceId : 'default', ]), ); From 055077a09e6878cf52ed3b049ca5cb58ce678f05 Mon Sep 17 00:00:00 2001 From: Dilip Kola Date: Wed, 11 Sep 2024 11:11:11 +0530 Subject: [PATCH 18/18] chore: revert test cases --- src/v0/util/index.js | 4 +++- test/integrations/destinations/algolia/router/data.ts | 8 ++++---- test/integrations/destinations/iterable/router/data.ts | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 2778a37d26..cf1ff60e99 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -2305,7 +2305,9 @@ const groupRouterTransformEvents = (events) => Object.values( lodash.groupBy(events, (ev) => [ ev.metadata?.destinationId || ev.destination?.ID, - ev.metadata?.sourceCategory === 'warehouse' ? ev.metadata?.sourceId : 'default', + ev.metadata?.sourceCategory === 'warehouse' + ? ev.metadata?.sourceId || ev.connection?.sourceId + : 'default', ]), ); diff --git a/test/integrations/destinations/algolia/router/data.ts b/test/integrations/destinations/algolia/router/data.ts index 4aadd922a2..dca899693e 100644 --- a/test/integrations/destinations/algolia/router/data.ts +++ b/test/integrations/destinations/algolia/router/data.ts @@ -75,7 +75,7 @@ export const data = [ integrations: { All: true }, originalTimestamp: '2021-10-25T09:40:08.879Z', }, - metadata: { jobId: 1, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, + metadata: { jobId: 1, userId: 'u1' }, destination: { ID: '1zzHtStW2ZPlullmz6L7DGnmk9V', Name: 'algolia-dev', @@ -178,7 +178,7 @@ export const data = [ integrations: { All: true }, originalTimestamp: '2021-10-25T09:40:08.886Z', }, - metadata: { jobId: 2, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, + metadata: { jobId: 2, userId: 'u1' }, destination: { ID: '1zzHtStW2ZPlullmz6L7DGnmk9V', Name: 'algolia-dev', @@ -317,8 +317,8 @@ export const data = [ files: {}, }, metadata: [ - { jobId: 1, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, - { jobId: 2, userId: 'u1', destinationId: '1zzHtStW2ZPlullmz6L7DGnmk9V' }, + { jobId: 1, userId: 'u1' }, + { jobId: 2, userId: 'u1' }, ], batched: true, statusCode: 200, diff --git a/test/integrations/destinations/iterable/router/data.ts b/test/integrations/destinations/iterable/router/data.ts index ba186c048e..09eedc8eb8 100644 --- a/test/integrations/destinations/iterable/router/data.ts +++ b/test/integrations/destinations/iterable/router/data.ts @@ -296,7 +296,7 @@ export const data = [ type: 'identify', userId: 'lynnanderson@smith.net', }, - metadata: { jobId: 5, userId: 'u1', destinationId: '1zia9wKshXt80YksLmUdJnr7IHI' }, + metadata: { jobId: 5, userId: 'u1' }, destination: { ID: '1zia9wKshXt80YksLmUdJnr7IHI', Name: 'test_iterable', @@ -807,7 +807,7 @@ export const data = [ }, files: {}, }, - metadata: [{ jobId: 5, userId: 'u1', destinationId: '1zia9wKshXt80YksLmUdJnr7IHI' }], + metadata: [{ jobId: 5, userId: 'u1' }], batched: true, statusCode: 200, destination: {