Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update FBCA spec to support VDM v2 events #3675

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
40a9777
feat: add job run id level isolation
koladilip Jun 24, 2024
9d1487d
Merge branch 'develop' into feat.many-to-one
koladilip Jul 3, 2024
d708309
feat: add group by sourceID for rETL
koladilip Jul 3, 2024
a1bc2c1
fix: lint isues
koladilip Jul 3, 2024
68c7822
Merge branch 'develop' into feat.many-to-one
koladilip Jul 10, 2024
38de49a
fix: lint issues
koladilip Jul 10, 2024
7e258d5
fix: component test cases
koladilip Jul 10, 2024
73f1d2a
Merge branch 'develop' into feat.many-to-one
koladilip Jul 14, 2024
bc19a45
Merge branch 'develop' into feat.many-to-one
koladilip Jul 24, 2024
7ed91a7
Merge branch 'develop' into feat.many-to-one
koladilip Aug 21, 2024
d09ca5c
feat: update FBCA spec to support VDM v2 events
sandeepdsvs Aug 21, 2024
0017cf7
Merge remote-tracking branch 'origin/feat.many-to-one' into develop
sandeepdsvs Aug 21, 2024
cf6e879
feat: update FBCA spec to support VDM v2 events
sandeepdsvs Aug 21, 2024
75dab29
chore: updated tests
sandeepdsvs Aug 21, 2024
018bf23
chore: added logs
sandeepdsvs Aug 22, 2024
13c5469
fix: audienceId not found error
vyeshwanth Aug 22, 2024
b6fd84c
fix: tests
vyeshwanth Aug 22, 2024
c1cde0f
feat: fetch isHashRequired from connection configuration
sandeepdsvs Aug 26, 2024
42fbce5
chore: addressed review comments
sandeepdsvs Aug 29, 2024
9a9340f
Merge branch 'develop' into feat.vdm-next-poc
sandeepdsvs Aug 29, 2024
9589ab7
feat: added backward compatability to support both old and new VDM
sandeepdsvs Sep 5, 2024
a5dfc8f
chore: updated mapped schema parameter
sandeepdsvs Sep 6, 2024
9782db9
chore: addressed review comments
sandeepdsvs Sep 10, 2024
6914e00
Merge branch 'develop' into feat.many-to-one
koladilip Sep 11, 2024
344b336
chore: add fallback to destination and source ids
koladilip Sep 11, 2024
055077a
chore: revert test cases
koladilip Sep 11, 2024
92c30e5
Merge branch 'feat.many-to-one' into feat.vdm-next-poc
koladilip Sep 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 31 additions & 39 deletions src/services/destination/cdkV2Integration.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable class-methods-use-this */
import { TransformationError } from '@rudderstack/integrations-lib';
import groupBy from 'lodash/groupBy';
import { processCdkV2Workflow } from '../../cdk/v2/handler';
import { DestinationService } from '../../interfaces/DestinationService';
import {
Expand All @@ -20,6 +19,7 @@ import {
} from '../../types/index';
import stats from '../../util/stats';
import { CatchErr } from '../../util/types';
import { groupRouterTransformEvents } from '../../v0/util';
import tags from '../../v0/util/tags';
import { DestinationPostTransformationService } from './postTransformation';

Expand Down Expand Up @@ -112,46 +112,38 @@ export class CDKV2DestinationService implements DestinationService {
_version: string,
requestMetadata: NonNullable<unknown>,
): Promise<RouterTransformationResponse[]> {
const allDestEvents: object = groupBy(
events,
(ev: RouterTransformationRequestData) => ev.destination?.ID,
);
const groupedEvents: RouterTransformationRequestData[][] = groupRouterTransformEvents(events);
const response: RouterTransformationResponse[][] = await Promise.all(
Object.values(allDestEvents).map(
async (destInputArray: RouterTransformationRequestData[]) => {
const metaTo = this.getTags(
destinationType,
destInputArray[0].metadata.destinationId,
destInputArray[0].metadata.workspaceId,
tags.FEATURES.ROUTER,
);
metaTo.metadata = destInputArray[0].metadata;
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await processCdkV2Workflow(
destinationType,
destInputArray,
tags.FEATURES.ROUTER,
requestMetadata,
);
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
undefined,
metaTo,
tags.IMPLEMENTATIONS.CDK_V2,
destinationType.toUpperCase(),
groupedEvents.map(async (destInputArray: RouterTransformationRequestData[]) => {
const metaTo = this.getTags(
destinationType,
destInputArray[0].metadata.destinationId,
destInputArray[0].metadata.workspaceId,
tags.FEATURES.ROUTER,
);
metaTo.metadata = destInputArray[0].metadata;
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await processCdkV2Workflow(
destinationType,
destInputArray,
tags.FEATURES.ROUTER,
requestMetadata,
);
} catch (error: CatchErr) {
metaTo.metadatas = destInputArray.map((input) => input.metadata);
const erroredResp =
DestinationPostTransformationService.handleRouterTransformFailureEvents(
error,
metaTo,
);
return [erroredResp];
}
},
),
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
undefined,
metaTo,
tags.IMPLEMENTATIONS.CDK_V2,
destinationType.toUpperCase(),
);
} catch (error: CatchErr) {
metaTo.metadatas = destInputArray.map((input) => input.metadata);
const erroredResp =
DestinationPostTransformationService.handleRouterTransformFailureEvents(error, metaTo);
return [erroredResp];
}
}),
);
return response.flat();
}
Expand Down
7 changes: 2 additions & 5 deletions src/services/destination/nativeIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
import stats from '../../util/stats';
import tags from '../../v0/util/tags';
import { DestinationPostTransformationService } from './postTransformation';
import { groupRouterTransformEvents } from '../../v0/util';

export class NativeIntegrationDestinationService implements DestinationService {
public init() {}
Expand Down Expand Up @@ -99,11 +100,7 @@ export class NativeIntegrationDestinationService implements DestinationService {
requestMetadata: NonNullable<unknown>,
): Promise<RouterTransformationResponse[]> {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
const allDestEvents: NonNullable<unknown> = groupBy(
events,
(ev: RouterTransformationRequestData) => ev.destination?.ID,
);
const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents);
const groupedEvents: RouterTransformationRequestData[][] = groupRouterTransformEvents(events);
const response: RouterTransformationResponse[][] = await Promise.all(
groupedEvents.map(async (destInputArray: RouterTransformationRequestData[]) => {
const metaTO = this.getTags(
Expand Down
1 change: 1 addition & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ type RouterTransformationRequestData = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Record<string, unknown>;
};

type RouterTransformationRequest = {
Expand Down
3 changes: 3 additions & 0 deletions src/v0/destinations/fb_custom_audience/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ function getEndPoint(audienceId) {
return `${BASE_URL}/${audienceId}/users`;
}

const VDM_V2_SCHEMA_VERSION = '1.1';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can declare this in global somewhere, this is same for any destination


const schemaFields = [
'EXTERN_ID',
'EMAIL',
Expand Down Expand Up @@ -105,4 +107,5 @@ module.exports = {
typeFields,
subTypeFields,
maxPayloadSize,
VDM_V2_SCHEMA_VERSION,
};
33 changes: 19 additions & 14 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
const lodash = require('lodash');
const get = require('get-value');
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const { schemaFields, VDM_V2_SCHEMA_VERSION } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const stats = require('../../../util/stats');
const {
Expand All @@ -17,6 +17,7 @@ const {
ensureApplicableFormat,
getUpdatedDataElement,
getSchemaForEventMappedToDest,
getSchemaForEventMappedToDestForVDMv2,
batchingWithPayloadSize,
responseBuilderSimple,
getDataSource,
Expand Down Expand Up @@ -100,18 +101,18 @@ const processRecordEventArray = (
};

async function processRecordInputs(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { destination, connection } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const {
isHashRequired,
accessToken,
disableFormat,
type,
subType,
isRaw,
maxUserCount,
audienceId,
} = destination.Config;
const { accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config;
let audienceId;
let isHashRequired;
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) {
audienceId = connection?.config?.destination?.audienceId;
isHashRequired = connection?.config?.destination?.isHashRequired;
} else {
audienceId = destination.Config?.audienceId;
isHashRequired = destination.Config?.isHashRequired;
}
const prepareParams = {
access_token: accessToken,
};
Expand All @@ -125,7 +126,7 @@ async function processRecordInputs(groupedRecordInputs) {
// audience id validation
let operationAudienceId = audienceId;
const mappedToDestination = get(message, MappedToDestinationKey);
if (mappedToDestination) {
if (mappedToDestination && !operationAudienceId) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}
Expand All @@ -136,7 +137,11 @@ async function processRecordInputs(groupedRecordInputs) {
// user schema validation
let { userSchema } = destination.Config;
if (mappedToDestination) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sending mappedToDestination in new format?

userSchema = getSchemaForEventMappedToDest(message);
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write common util for this condition, mostly this will be used multiple destinations

userSchema = getSchemaForEventMappedToDestForVDMv2(message);
} else {
userSchema = getSchemaForEventMappedToDest(message);
}
}
if (!Array.isArray(userSchema)) {
userSchema = [userSchema];
Expand Down
26 changes: 19 additions & 7 deletions src/v0/destinations/fb_custom_audience/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ const {
const {
prepareDataField,
getSchemaForEventMappedToDest,
getSchemaForEventMappedToDestForVDMv2,
batchingWithPayloadSize,
generateAppSecretProof,
responseBuilderSimple,
getDataSource,
} = require('./util');
const { schemaFields, USER_ADD, USER_DELETE } = require('./config');
const { schemaFields, USER_ADD, USER_DELETE, VDM_V2_SCHEMA_VERSION } = require('./config');

const { MappedToDestinationKey } = require('../../../constants');
const { processRecordInputs } = require('./recordTransform');
const logger = require('../../../logger');

function checkForUnsupportedEventTypes(dictionary, keyList) {
const unsupportedEventTypes = [];
Expand Down Expand Up @@ -154,11 +154,20 @@ const prepareToSendEvents = (
});
return toSendEvents;
};
const processEvent = (message, destination) => {
const processEvent = (message, destination, connection) => {
const respList = [];
let toSendEvents = [];
let { userSchema } = destination.Config;
const { isHashRequired, audienceId, maxUserCount } = destination.Config;
const { maxUserCount } = destination.Config;
let audienceId;
let isHashRequired;
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) {
audienceId = connection?.config?.destination?.audienceId;
isHashRequired = connection?.config?.destination?.isHashRequired;
} else {
audienceId = destination.Config?.audienceId;
isHashRequired = destination.Config?.isHashRequired;
}
if (!message.type) {
throw new InstrumentationError('Message Type is not present. Aborting message.');
}
Expand All @@ -182,7 +191,11 @@ const processEvent = (message, destination) => {

// If mapped to destination, use the mapped fields instead of destination userschema
if (mappedToDestination) {
userSchema = getSchemaForEventMappedToDest(message);
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) {
userSchema = getSchemaForEventMappedToDestForVDMv2(message);
} else {
userSchema = getSchemaForEventMappedToDest(message);
}
}

// When one single schema field is added in the webapp, it does not appear to be an array
Expand Down Expand Up @@ -236,7 +249,7 @@ const processEvent = (message, destination) => {
return respList;
};

const process = (event) => processEvent(event.message, event.destination);
const process = (event) => processEvent(event.message, event.destination, event.connection);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can directly pass event here to processEvent


const processRouterDest = async (inputs, reqMetadata) => {
const respList = [];
Expand All @@ -247,7 +260,6 @@ const processRouterDest = async (inputs, reqMetadata) => {
const eventTypes = ['record', 'audiencelist'];
const unsupportedEventList = checkForUnsupportedEventTypes(groupedInputs, eventTypes);
if (unsupportedEventList.length > 0) {
logger.info(`unsupported events found ${unsupportedEventList}`);
throw new ConfigurationError('unsupported events present in the event');
}

Expand Down
13 changes: 13 additions & 0 deletions src/v0/destinations/fb_custom_audience/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ const getSchemaForEventMappedToDest = (message) => {
return userSchema;
};

const getSchemaForEventMappedToDestForVDMv2 = (message) => {
const mappedSchema = message.fields;
if (!mappedSchema) {
throw new InstrumentationError(
'event.fields is required property for events mapped to destination ',
);
}
let userSchema = Object.keys(mappedSchema);
userSchema = userSchema.map((field) => field.trim());
return userSchema;
};

// function responsible to ensure the user inputs are passed according to the allowed format
const ensureApplicableFormat = (userProperty, userInformation) => {
let updatedProperty;
Expand Down Expand Up @@ -261,6 +273,7 @@ const responseBuilderSimple = (payload, audienceId) => {
module.exports = {
prepareDataField,
getSchemaForEventMappedToDest,
getSchemaForEventMappedToDestForVDMv2,
batchingWithPayloadSize,
ensureApplicableFormat,
getUpdatedDataElement,
Expand Down
25 changes: 23 additions & 2 deletions src/v0/util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2265,10 +2265,30 @@ const validateEventAndLowerCaseConversion = (event, isMandatory, convertToLowerC
return convertToLowerCase ? event.toString().toLowerCase() : event.toString();
};

const applyCustomMappings = (message, mappings) =>
JsonTemplateEngine.createAsSync(mappings, { defaultPathType: PathType.JSON }).evaluate(message);
/**
* This function applies custom mappings to the event.
* @param {*} event The event to be transformed.
* @param {*} mappings The custom mappings to be applied.
* @returns {object} The transformed event.
*/
const applyCustomMappings = (event, mappings) =>
JsonTemplateEngine.createAsSync(mappings, { defaultPathType: PathType.JSON }).evaluate(event);

/**
* This groups the events by destination ID and source ID.
* Note: sourceID is only used for rETL events.
* @param {*} events The events to be grouped.
* @returns {array} The array of grouped events.
*/
const groupRouterTransformEvents = (events) =>
Object.values(
lodash.groupBy(events, (ev) => [
ev.metadata?.destinationId,
ev.metadata?.sourceCategory === 'warehouse' ? ev.metadata?.sourceId : 'default',
]),
);

/*
* Gets url path omitting the hostname & protocol
*
* **Note**:
Expand Down Expand Up @@ -2342,6 +2362,7 @@ module.exports = {
getValueFromMessage,
getValueFromPropertiesOrTraits,
getValuesAsArrayFromConfig,
groupRouterTransformEvents,
handleSourceKeysOperation,
hashToSha256,
isAppleFamily,
Expand Down
Loading
Loading