-
Notifications
You must be signed in to change notification settings - Fork 113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: update FBCA spec to support VDM v2 events #3675
Changes from 21 commits
40a9777
9d1487d
d708309
a1bc2c1
68c7822
38de49a
7e258d5
73f1d2a
bc19a45
7ed91a7
d09ca5c
0017cf7
cf6e879
75dab29
018bf23
13c5469
b6fd84c
c1cde0f
42fbce5
9a9340f
9589ab7
a5dfc8f
9782db9
6914e00
344b336
055077a
92c30e5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ | |
const lodash = require('lodash'); | ||
const get = require('get-value'); | ||
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); | ||
const { schemaFields } = require('./config'); | ||
const { schemaFields, VDM_V2_SCHEMA_VERSION } = require('./config'); | ||
const { MappedToDestinationKey } = require('../../../constants'); | ||
const stats = require('../../../util/stats'); | ||
const { | ||
|
@@ -17,6 +17,7 @@ const { | |
ensureApplicableFormat, | ||
getUpdatedDataElement, | ||
getSchemaForEventMappedToDest, | ||
getSchemaForEventMappedToDestForVDMv2, | ||
batchingWithPayloadSize, | ||
responseBuilderSimple, | ||
getDataSource, | ||
|
@@ -100,18 +101,18 @@ const processRecordEventArray = ( | |
}; | ||
|
||
async function processRecordInputs(groupedRecordInputs) { | ||
const { destination } = groupedRecordInputs[0]; | ||
const { destination, connection } = groupedRecordInputs[0]; | ||
const { message } = groupedRecordInputs[0]; | ||
const { | ||
isHashRequired, | ||
accessToken, | ||
disableFormat, | ||
type, | ||
subType, | ||
isRaw, | ||
maxUserCount, | ||
audienceId, | ||
} = destination.Config; | ||
const { accessToken, disableFormat, type, subType, isRaw, maxUserCount } = destination.Config; | ||
let audienceId; | ||
let isHashRequired; | ||
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { | ||
audienceId = connection?.config?.destination?.audienceId; | ||
isHashRequired = connection?.config?.destination?.isHashRequired; | ||
} else { | ||
audienceId = destination.Config?.audienceId; | ||
isHashRequired = destination.Config?.isHashRequired; | ||
} | ||
const prepareParams = { | ||
access_token: accessToken, | ||
}; | ||
|
@@ -125,7 +126,7 @@ async function processRecordInputs(groupedRecordInputs) { | |
// audience id validation | ||
let operationAudienceId = audienceId; | ||
const mappedToDestination = get(message, MappedToDestinationKey); | ||
if (mappedToDestination) { | ||
if (mappedToDestination && !operationAudienceId) { | ||
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE'); | ||
operationAudienceId = objectType; | ||
} | ||
|
@@ -136,7 +137,11 @@ async function processRecordInputs(groupedRecordInputs) { | |
// user schema validation | ||
let { userSchema } = destination.Config; | ||
if (mappedToDestination) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we sending mappedToDestination in new format? |
||
userSchema = getSchemaForEventMappedToDest(message); | ||
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. write common util for this condition, mostly this will be used multiple destinations |
||
userSchema = getSchemaForEventMappedToDestForVDMv2(message); | ||
} else { | ||
userSchema = getSchemaForEventMappedToDest(message); | ||
} | ||
} | ||
if (!Array.isArray(userSchema)) { | ||
userSchema = [userSchema]; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,16 +12,16 @@ const { | |
const { | ||
prepareDataField, | ||
getSchemaForEventMappedToDest, | ||
getSchemaForEventMappedToDestForVDMv2, | ||
batchingWithPayloadSize, | ||
generateAppSecretProof, | ||
responseBuilderSimple, | ||
getDataSource, | ||
} = require('./util'); | ||
const { schemaFields, USER_ADD, USER_DELETE } = require('./config'); | ||
const { schemaFields, USER_ADD, USER_DELETE, VDM_V2_SCHEMA_VERSION } = require('./config'); | ||
|
||
const { MappedToDestinationKey } = require('../../../constants'); | ||
const { processRecordInputs } = require('./recordTransform'); | ||
const logger = require('../../../logger'); | ||
|
||
function checkForUnsupportedEventTypes(dictionary, keyList) { | ||
const unsupportedEventTypes = []; | ||
|
@@ -154,11 +154,20 @@ const prepareToSendEvents = ( | |
}); | ||
return toSendEvents; | ||
}; | ||
const processEvent = (message, destination) => { | ||
const processEvent = (message, destination, connection) => { | ||
const respList = []; | ||
let toSendEvents = []; | ||
let { userSchema } = destination.Config; | ||
const { isHashRequired, audienceId, maxUserCount } = destination.Config; | ||
const { maxUserCount } = destination.Config; | ||
let audienceId; | ||
let isHashRequired; | ||
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { | ||
audienceId = connection?.config?.destination?.audienceId; | ||
isHashRequired = connection?.config?.destination?.isHashRequired; | ||
} else { | ||
audienceId = destination.Config?.audienceId; | ||
isHashRequired = destination.Config?.isHashRequired; | ||
} | ||
if (!message.type) { | ||
throw new InstrumentationError('Message Type is not present. Aborting message.'); | ||
} | ||
|
@@ -182,7 +191,11 @@ const processEvent = (message, destination) => { | |
|
||
// If mapped to destination, use the mapped fields instead of destination userschema | ||
if (mappedToDestination) { | ||
userSchema = getSchemaForEventMappedToDest(message); | ||
if (connection?.config?.destination?.schemaVersion === VDM_V2_SCHEMA_VERSION) { | ||
userSchema = getSchemaForEventMappedToDestForVDMv2(message); | ||
} else { | ||
userSchema = getSchemaForEventMappedToDest(message); | ||
} | ||
} | ||
|
||
// When one single schema field is added in the webapp, it does not appear to be an array | ||
|
@@ -236,7 +249,7 @@ const processEvent = (message, destination) => { | |
return respList; | ||
}; | ||
|
||
const process = (event) => processEvent(event.message, event.destination); | ||
const process = (event) => processEvent(event.message, event.destination, event.connection); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can directly pass event here to processEvent |
||
|
||
const processRouterDest = async (inputs, reqMetadata) => { | ||
const respList = []; | ||
|
@@ -247,7 +260,6 @@ const processRouterDest = async (inputs, reqMetadata) => { | |
const eventTypes = ['record', 'audiencelist']; | ||
const unsupportedEventList = checkForUnsupportedEventTypes(groupedInputs, eventTypes); | ||
if (unsupportedEventList.length > 0) { | ||
logger.info(`unsupported events found ${unsupportedEventList}`); | ||
throw new ConfigurationError('unsupported events present in the event'); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can declare this in global somewhere, this is same for any destination