diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index cfcb1fc0d8c..c52962467d9 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -46,7 +46,7 @@ N/A - [ ] Is the PR limited to one linear task? -- [ ] Are relevant unit and component test-cases added? +- [ ] Are relevant unit and component test-cases added in **new readability format**? ### Reviewer checklist diff --git a/CHANGELOG.md b/CHANGELOG.md index 4258fdfcc56..8b2fff8bced 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,48 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [1.68.2](https://github.com/rudderlabs/rudder-transformer/compare/v1.68.1...v1.68.2) (2024-06-06) + + +### Bug Fixes + +* **user-transformation:** pass tf id in common metadata ([d2b0779](https://github.com/rudderlabs/rudder-transformer/commit/d2b0779d3a0145c0088903edfe328c1c7554cbd2)) + +### [1.68.1](https://github.com/rudderlabs/rudder-transformer/compare/v1.68.0...v1.68.1) (2024-05-29) + + +### Bug Fixes + +* tiktok_v2 assigning value to undefined properties ([#3426](https://github.com/rudderlabs/rudder-transformer/issues/3426)) ([323396b](https://github.com/rudderlabs/rudder-transformer/commit/323396b09fd6b7fda3cce53cc4f1cc443d7a78c1)) + +## [1.68.0](https://github.com/rudderlabs/rudder-transformer/compare/v1.67.0...v1.68.0) (2024-05-27) + + +### Features + +* add json-data type support in redis ([#3336](https://github.com/rudderlabs/rudder-transformer/issues/3336)) ([0196f20](https://github.com/rudderlabs/rudder-transformer/commit/0196f20cc79e1f470d96a649dd9404c3c9284329)) +* facebook custom audience app secret support ([#3357](https://github.com/rudderlabs/rudder-transformer/issues/3357)) ([fce4ef9](https://github.com/rudderlabs/rudder-transformer/commit/fce4ef973500411c7ad812e7949bb1b73dabc3ba)) +* filtering unknown events in awin ([#3392](https://github.com/rudderlabs/rudder-transformer/issues/3392)) ([d842da8](https://github.com/rudderlabs/rudder-transformer/commit/d842da87a34cb63023eba288e0c5258e29997dcf)) +* **ga4:** component test refactor ([#3220](https://github.com/rudderlabs/rudder-transformer/issues/3220)) ([3ff9a5e](https://github.com/rudderlabs/rudder-transformer/commit/3ff9a5e8e955b929a1b04a89dcf0ccbc49e18648)) +* **integrations/auth0:** include Auth0 event type in Rudderstack message ([#3370](https://github.com/rudderlabs/rudder-transformer/issues/3370)) ([e9409fd](https://github.com/rudderlabs/rudder-transformer/commit/e9409fde6063d7eaa8558396b85b5fdf99f964e1)) +* onboard koddi destination ([#3359](https://github.com/rudderlabs/rudder-transformer/issues/3359)) ([f74c4a0](https://github.com/rudderlabs/rudder-transformer/commit/f74c4a0bc92ae6ccb0c00ac5b21745e496a015bc)) +* onboarding adjust source ([#3395](https://github.com/rudderlabs/rudder-transformer/issues/3395)) ([668d331](https://github.com/rudderlabs/rudder-transformer/commit/668d3311aadacbb92b1873bf43919db7d341afbb)) + + +### Bug Fixes + +* fb custom audience html response ([#3402](https://github.com/rudderlabs/rudder-transformer/issues/3402)) ([d1a2bd6](https://github.com/rudder +* standardise hashing for all CAPI integrations ([#3379](https://github.com/rudderlabs/rudder-transformer/issues/3379)) ([c249a69](https://github.com/rudderlabs/rudder-transformer/commit/c249a694d735f6d241a35b6e21f493c54890ac84)) +* tiktok_v2 remove default value for content-type for custom events ([#3383](https://github.com/rudderlabs/rudder-transformer/issues/3383)) ([6e7b5a0](https://github.com/rudderlabs/rudder-transformer/commit/6e7b5a0d8bf2c859dfb15b9cad7ed6070bd0892b)) +* added step for reconciling openfaas functions for python transformations ([#3420](https://github.com/rudderlabs/rudder-transformer/issues/3420)) ([7a2ab63](https://github.com/rudderlabs/rudder-transformer/commit/7a2ab63674d40870af4d16f0673a2a2594c899e9)) + +## [1.67.0](https://github.com/rudderlabs/rudder-transformer/compare/v1.66.1...v1.67.0) (2024-05-23) + + +### Features + +* sre 456 ut move high cardinality histogram metrics to summaries cp ([#3409](https://github.com/rudderlabs/rudder-transformer/issues/3409)) ([be20dc2](https://github.com/rudderlabs/rudder-transformer/commit/be20dc26ade2fa0212dc91126cf42087a84a07c9)) + ### [1.66.1](https://github.com/rudderlabs/rudder-transformer/compare/v1.66.0...v1.66.1) (2024-05-20) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index bdd76d916cf..48055816e4e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -49,5 +49,5 @@ We look forward to your feedback on improving this project. [slack]: https://resources.rudderstack.com/join-rudderstack-slack [issue]: https://github.com/rudderlabs/rudder-transformer/issues/new -[cla]: https://rudderlabs.wufoo.com/forms/rudderlabs-contributor-license-agreement +[cla]: https://forms.gle/845JRGVZaC6kPZy68 [config-generator]: https://github.com/rudderlabs/config-generator diff --git a/package-lock.json b/package-lock.json index f51f3ccd8ef..5effe0c2496 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "rudder-transformer", - "version": "1.66.1", + "version": "1.68.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "rudder-transformer", - "version": "1.66.1", + "version": "1.68.2", "license": "ISC", "dependencies": { "@amplitude/ua-parser-js": "0.7.24", @@ -20,7 +20,7 @@ "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.9", "@rudderstack/integrations-lib": "^0.2.8", - "@rudderstack/workflow-engine": "^0.7.5", + "@rudderstack/workflow-engine": "^0.7.9", "@shopify/jest-koa-mocks": "^5.1.1", "ajv": "^8.12.0", "ajv-draft-04": "^1.0.0", @@ -4458,21 +4458,21 @@ } }, "node_modules/@rudderstack/json-template-engine": { - "version": "0.8.5", - "resolved": "https://registry.npmjs.org/@rudderstack/json-template-engine/-/json-template-engine-0.8.5.tgz", - "integrity": "sha512-+iH40g+ZA2ANgwjOITdEdZJLZV+ljR28Akn/dRoDia591tMu7PptyvDaAvl+m1DijWXddpLQ8SX9xaEcIdmqlw==" + "version": "0.10.5", + "resolved": "https://registry.npmjs.org/@rudderstack/json-template-engine/-/json-template-engine-0.10.5.tgz", + "integrity": "sha512-PasCK5RDwiRHsFhAb3w0n+8JPRYcZTffe2l+M/wtzvqU+12NPj3YTEIaMWkhogY6AmPYswAaMX/kr+4j7dKiUA==" }, "node_modules/@rudderstack/workflow-engine": { - "version": "0.7.5", - "resolved": "https://registry.npmjs.org/@rudderstack/workflow-engine/-/workflow-engine-0.7.5.tgz", - "integrity": "sha512-HmhxiF/gZorrEEmVvQYopIN6xicQ7kr0mHtw2fPqXmHIFLr9MnEyefo4+MPw/Re9iNFbXNQC9uKkYd7lLHbAyw==", + "version": "0.7.9", + "resolved": "https://registry.npmjs.org/@rudderstack/workflow-engine/-/workflow-engine-0.7.9.tgz", + "integrity": "sha512-uMELZk7UXs40bgQkIk7fIVrfHo/5ld+5I5kYgZt5rcT65H9aNpWjnNRnsKH9dgu+oxiBFAMassZq5ko4hpEdIQ==", "dependencies": { - "@aws-crypto/sha256-js": "^5.0.0", - "@rudderstack/json-template-engine": "^0.8.4", - "jsonata": "^2.0.4", + "@aws-crypto/sha256-js": "^5.2.0", + "@rudderstack/json-template-engine": "^0.10.5", + "jsonata": "^2.0.5", "lodash": "^4.17.21", - "object-sizeof": "^2.6.3", - "yaml": "^2.3.2" + "object-sizeof": "^2.6.4", + "yaml": "^2.4.3" } }, "node_modules/@rudderstack/workflow-engine/node_modules/@aws-crypto/sha256-js": { @@ -14480,9 +14480,9 @@ } }, "node_modules/jsonata": { - "version": "2.0.4", - "resolved": "https://registry.npmjs.org/jsonata/-/jsonata-2.0.4.tgz", - "integrity": "sha512-vfavX4/G/yrYxE+UrmT/oUJ3ph7KqUrb0R7b0LVRcntQwxw+Z5kA1pNUIQzX5hF04Oe1eKxyoIPsmXtc2LgJTQ==", + "version": "2.0.5", + "resolved": "https://registry.npmjs.org/jsonata/-/jsonata-2.0.5.tgz", + "integrity": "sha512-wEse9+QLIIU5IaCgtJCPsFi/H4F3qcikWzF4bAELZiRz08ohfx3Q6CjDRf4ZPF5P/92RI3KIHtb7u3jqPaHXdQ==", "engines": { "node": ">= 8" } @@ -16793,9 +16793,9 @@ } }, "node_modules/object-sizeof": { - "version": "2.6.3", - "resolved": "https://registry.npmjs.org/object-sizeof/-/object-sizeof-2.6.3.tgz", - "integrity": "sha512-GNkVRrLh11Qr5BGr73dwwPE200/78QG2rbx30cnXPnMvt7UuttH4Dup5t+LtcQhARkg8Hbr0c8Kiz52+CFxYmw==", + "version": "2.6.4", + "resolved": "https://registry.npmjs.org/object-sizeof/-/object-sizeof-2.6.4.tgz", + "integrity": "sha512-YuJAf7Bi61KROcYmXm8RCeBrBw8UOaJDzTm1gp0eU7RjYi1xEte3/Nmg/VyPaHcJZ3sNojs1Y0xvSrgwkLmcFw==", "dependencies": { "buffer": "^6.0.3" } @@ -20964,9 +20964,12 @@ "dev": true }, "node_modules/yaml": { - "version": "2.3.4", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.3.4.tgz", - "integrity": "sha512-8aAvwVUSHpfEqTQ4w/KMlf3HcRdt50E5ODIQJBw1fQ5RL34xabzxtUlzTXVqc4rkZsPbvrXKWnABCD7kWSmocA==", + "version": "2.4.3", + "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.4.3.tgz", + "integrity": "sha512-sntgmxj8o7DE7g/Qi60cqpLBA3HG3STcDA0kO+WfB05jEKhZMbY7umNm2rBpQvsmZ16/lPXCJGW2672dgOUkrg==", + "bin": { + "yaml": "bin.mjs" + }, "engines": { "node": ">= 14" } diff --git a/package.json b/package.json index 50a276ce42e..3aa3c017ff1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rudder-transformer", - "version": "1.66.1", + "version": "1.68.2", "description": "", "homepage": "https://github.com/rudderlabs/rudder-transformer#readme", "bugs": { @@ -65,7 +65,7 @@ "@ndhoule/extend": "^2.0.0", "@pyroscope/nodejs": "^0.2.9", "@rudderstack/integrations-lib": "^0.2.8", - "@rudderstack/workflow-engine": "^0.7.5", + "@rudderstack/workflow-engine": "^0.7.9", "@shopify/jest-koa-mocks": "^5.1.1", "ajv": "^8.12.0", "ajv-draft-04": "^1.0.0", diff --git a/src/cdk/v2/destinations/emarsys/utils.js b/src/cdk/v2/destinations/emarsys/utils.js index 2fe686718dc..e093064fc59 100644 --- a/src/cdk/v2/destinations/emarsys/utils.js +++ b/src/cdk/v2/destinations/emarsys/utils.js @@ -162,7 +162,7 @@ const deduceEventId = (message, destConfig) => { const { eventsMapping } = destConfig; const { event } = message; validateEventName(event); - if (eventsMapping.length > 0) { + if (Array.isArray(eventsMapping) && eventsMapping.length > 0) { const keyMap = getHashFromArray(eventsMapping, 'from', 'to', false); eventId = keyMap[event]; } diff --git a/src/legacy/router.js b/src/legacy/router.js index 90b6b05693a..b2c50660f41 100644 --- a/src/legacy/router.js +++ b/src/legacy/router.js @@ -649,6 +649,9 @@ if (startDestTransformer) { stats.timing('user_transform_request_latency', startTime, { processSessions, }); + stats.timingSummary('user_transform_request_latency_summary', startTime, { + processSessions, + }); stats.increment('user_transform_requests', { processSessions }); stats.histogram('user_transform_output_events', transformedEvents.length, { processSessions, diff --git a/src/services/userTransform.ts b/src/services/userTransform.ts index a587973deb5..e03aebc290d 100644 --- a/src/services/userTransform.ts +++ b/src/services/userTransform.ts @@ -67,6 +67,7 @@ export class UserTransformService { destinationId: eventsToProcess[0]?.metadata.destinationId, destinationType: eventsToProcess[0]?.metadata.destinationType, workspaceId: eventsToProcess[0]?.metadata.workspaceId, + transformationId: eventsToProcess[0]?.metadata.transformationId, messageIds, }; @@ -173,7 +174,17 @@ export class UserTransformService { ...getTransformationMetadata(eventsToProcess[0]?.metadata), }); - stats.histogram('user_transform_batch_size', requestSize, { + stats.timing('user_transform_batch_size', requestSize, { + ...metaTags, + ...getTransformationMetadata(eventsToProcess[0]?.metadata), + }); + + stats.timingSummary('user_transform_request_latency_summary', userFuncStartTime, { + ...metaTags, + ...getTransformationMetadata(eventsToProcess[0]?.metadata), + }); + + stats.timingSummary('user_transform_batch_size_summary', requestSize, { ...metaTags, ...getTransformationMetadata(eventsToProcess[0]?.metadata), }); diff --git a/src/types/index.ts b/src/types/index.ts index 93eb9bb6ba2..6297d49c42f 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -131,6 +131,8 @@ type Destination = { WorkspaceID: string; Transformations: UserTransformationInput[]; RevisionID?: string; + IsProcessorEnabled?: boolean; + IsConnectionEnabled?: boolean; }; type UserTransformationLibrary = { diff --git a/src/util/customTransformer-faas.js b/src/util/customTransformer-faas.js index 2c0bbfd8c00..9ac98040977 100644 --- a/src/util/customTransformer-faas.js +++ b/src/util/customTransformer-faas.js @@ -11,9 +11,10 @@ const { } = require('./openfaas'); const { getLibraryCodeV1 } = require('./customTransforrmationsStore-v1'); +const HASH_SECRET = process.env.OPENFAAS_FN_HASH_SECRET || ''; const libVersionIdsCache = new NodeCache(); -function generateFunctionName(userTransformation, libraryVersionIds, testMode) { +function generateFunctionName(userTransformation, libraryVersionIds, testMode, hashSecret = '') { if (userTransformation.versionId === FAAS_AST_VID) return FAAS_AST_FN_NAME; if (testMode) { @@ -21,10 +22,15 @@ function generateFunctionName(userTransformation, libraryVersionIds, testMode) { return funcName.substring(0, 63).toLowerCase(); } - const ids = [userTransformation.workspaceId, userTransformation.versionId].concat( + let ids = [userTransformation.workspaceId, userTransformation.versionId].concat( (libraryVersionIds || []).sort(), ); + if (hashSecret !== '') { + ids = ids.concat([hashSecret]); + } + + // FIXME: Why the id's are sorted ?! const hash = crypto.createHash('md5').update(`${ids}`).digest('hex'); return `fn-${userTransformation.workspaceId}-${hash}`.substring(0, 63).toLowerCase(); } @@ -90,7 +96,13 @@ async function setOpenFaasUserTransform( testMode, }; const functionName = - pregeneratedFnName || generateFunctionName(userTransformation, libraryVersionIds, testMode); + pregeneratedFnName || + generateFunctionName( + userTransformation, + libraryVersionIds, + testMode, + process.env.OPENFAAS_FN_HASH_SECRET, + ); const setupTime = new Date(); await setupFaasFunction( @@ -130,7 +142,13 @@ async function runOpenFaasUserTransform( const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {}; // check and deploy faas function if not exists - const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode); + const functionName = generateFunctionName( + userTransformation, + libraryVersionIds, + testMode, + process.env.OPENFAAS_FN_HASH_SECRET, + ); + if (testMode) { await setOpenFaasUserTransform( userTransformation, diff --git a/src/util/customTransformer-v1.js b/src/util/customTransformer-v1.js index 1b3e270f527..4d846687b50 100644 --- a/src/util/customTransformer-v1.js +++ b/src/util/customTransformer-v1.js @@ -95,6 +95,7 @@ async function userTransformHandlerV1( }; stats.counter('user_transform_function_input_events', events.length, tags); stats.timing('user_transform_function_latency', invokeTime, tags); + stats.timingSummary('user_transform_function_latency_summary', invokeTime, tags); } return { transformedEvents, logs }; diff --git a/src/util/customTransformer.js b/src/util/customTransformer.js index b3d7214a511..fcc71b82a31 100644 --- a/src/util/customTransformer.js +++ b/src/util/customTransformer.js @@ -261,6 +261,7 @@ async function runUserTransform( stats.counter('user_transform_function_input_events', events.length, tags); stats.timing('user_transform_function_latency', invokeTime, tags); + stats.timingSummary('user_transform_function_latency_summary', invokeTime, tags); } return { diff --git a/src/util/customTransforrmationsStore-v1.js b/src/util/customTransforrmationsStore-v1.js index 3263049b6f9..6e2d799f3a5 100644 --- a/src/util/customTransforrmationsStore-v1.js +++ b/src/util/customTransforrmationsStore-v1.js @@ -31,6 +31,7 @@ async function getTransformationCodeV1(versionId) { responseStatusHandler(response.status, 'Transformation', versionId, url); stats.increment('get_transformation_code', { success: 'true', ...tags }); stats.timing('get_transformation_code_time', startTime, tags); + stats.timingSummary('get_transformation_code_time_summary', startTime, tags); const myJson = await response.json(); transformationCache[versionId] = myJson; return myJson; @@ -56,6 +57,7 @@ async function getLibraryCodeV1(versionId) { responseStatusHandler(response.status, 'Transformation Library', versionId, url); stats.increment('get_libraries_code', { success: 'true', ...tags }); stats.timing('get_libraries_code_time', startTime, tags); + stats.timingSummary('get_libraries_code_time_summary', startTime, tags); const myJson = await response.json(); libraryCache[versionId] = myJson; return myJson; @@ -83,6 +85,7 @@ async function getRudderLibByImportName(importName) { responseStatusHandler(response.status, 'Rudder Library', importName, url); stats.increment('get_libraries_code', { success: 'true', ...tags }); stats.timing('get_libraries_code_time', startTime, tags); + stats.timingSummary('get_libraries_code_time_summary', startTime, tags); const myJson = await response.json(); rudderLibraryCache[importName] = myJson; return myJson; diff --git a/src/util/customTransforrmationsStore.js b/src/util/customTransforrmationsStore.js index 08d417c07cd..2c5a7b446d2 100644 --- a/src/util/customTransforrmationsStore.js +++ b/src/util/customTransforrmationsStore.js @@ -24,6 +24,7 @@ async function getTransformationCode(versionId) { responseStatusHandler(response.status, 'Transformation', versionId, url); stats.increment('get_transformation_code', { versionId, success: 'true' }); stats.timing('get_transformation_code_time', startTime, { versionId }); + stats.timingSummary('get_transformation_code_time_summary', startTime, { versionId }); const myJson = await response.json(); myCache.set(versionId, myJson); return myJson; diff --git a/src/util/openfaas/faasApi.js b/src/util/openfaas/faasApi.js index f8f830f6e48..b932b70032b 100644 --- a/src/util/openfaas/faasApi.js +++ b/src/util/openfaas/faasApi.js @@ -1,6 +1,8 @@ const axios = require('axios'); const { RespStatusError, RetryRequestError } = require('../utils'); +const logger = require('../../logger'); + const OPENFAAS_GATEWAY_URL = process.env.OPENFAAS_GATEWAY_URL || 'http://localhost:8080'; const OPENFAAS_GATEWAY_USERNAME = process.env.OPENFAAS_GATEWAY_USERNAME || ''; const OPENFAAS_GATEWAY_PASSWORD = process.env.OPENFAAS_GATEWAY_PASSWORD || ''; @@ -12,7 +14,7 @@ const basicAuth = { const parseAxiosError = (error) => { if (error.response) { - const status = error.response.status || 400; + const status = error.response.status || 500; const errorData = error.response?.data; const message = (errorData && (errorData.message || errorData.error || errorData)) || error.message; @@ -61,6 +63,8 @@ const invokeFunction = async (functionName, payload) => }); const checkFunctionHealth = async (functionName) => { + logger.debug(`Checking function health: ${functionName}`); + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/function/${functionName}`; axios @@ -76,8 +80,10 @@ const checkFunctionHealth = async (functionName) => { }); }; -const deployFunction = async (payload) => - new Promise((resolve, reject) => { +const deployFunction = async (payload) => { + logger.debug(`Deploying function: ${payload?.name}`); + + return new Promise((resolve, reject) => { const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; axios .post(url, payload, { auth: basicAuth }) @@ -86,6 +92,21 @@ const deployFunction = async (payload) => reject(parseAxiosError(err)); }); }); +}; + +const updateFunction = async (fnName, payload) => { + logger.debug(`Updating function: ${fnName}`); + + return new Promise((resolve, reject) => { + const url = `${OPENFAAS_GATEWAY_URL}/system/functions`; + axios + .put(url, payload, { auth: basicAuth }) + .then((resp) => resolve(resp.data)) + .catch((err) => { + reject(parseAxiosError(err)); + }); + }); +}; module.exports = { deleteFunction, @@ -94,4 +115,5 @@ module.exports = { getFunctionList, invokeFunction, checkFunctionHealth, + updateFunction, }; diff --git a/src/util/openfaas/index.js b/src/util/openfaas/index.js index 7a1fce3cfa4..c0369deb816 100644 --- a/src/util/openfaas/index.js +++ b/src/util/openfaas/index.js @@ -4,6 +4,7 @@ const { deployFunction, invokeFunction, checkFunctionHealth, + updateFunction, } = require('./faasApi'); const logger = require('../../logger'); const { RetryRequestError, RespStatusError } = require('../utils'); @@ -33,6 +34,7 @@ const FAAS_AST_FN_NAME = 'fn-ast'; const CUSTOM_NETWORK_POLICY_WORKSPACE_IDS = process.env.CUSTOM_NETWORK_POLICY_WORKSPACE_IDS || ''; const customNetworkPolicyWorkspaceIds = CUSTOM_NETWORK_POLICY_WORKSPACE_IDS.split(','); const CUSTOMER_TIER = process.env.CUSTOMER_TIER || 'shared'; +const DISABLE_RECONCILE_FN = process.env.DISABLE_RECONCILE_FN == 'true' || false; // Initialise node cache const functionListCache = new NodeCache(); @@ -67,6 +69,8 @@ const awaitFunctionReadiness = async ( maxWaitInMs = 22000, waitBetweenIntervalsInMs = 250, ) => { + logger.debug(`Awaiting function readiness: ${functionName}`); + const executionPromise = new Promise(async (resolve) => { try { await callWithRetry( @@ -121,7 +125,7 @@ const invalidateFnCache = () => { functionListCache.set(FUNC_LIST_KEY, []); }; -const deployFaasFunction = async ( +const updateFaasFunction = async ( functionName, code, versionId, @@ -130,73 +134,50 @@ const deployFaasFunction = async ( trMetadata = {}, ) => { try { - logger.debug(`[Faas] Deploying a faas function: ${functionName}`); - let envProcess = 'python index.py'; - - const lvidsString = libraryVersionIDs.join(','); + logger.debug(`Updating faas fn: ${functionName}`); - if (!testMode) { - envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; - } else { - envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; - } - - const envVars = {}; - if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') { - envVars.max_inflight = FAAS_MAX_INFLIGHT; - envVars.exec_timeout = FAAS_EXEC_TIMEOUT; - } - if (GEOLOCATION_URL) { - envVars.geolocation_url = GEOLOCATION_URL; - } - // labels - const labels = { - 'openfaas-fn': 'true', - 'parent-component': 'openfaas', - 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, - 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, - 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, - 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, - 'com.openfaas.scale.target': FAAS_SCALE_TARGET, - 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, - 'com.openfaas.scale.type': FAAS_SCALE_TYPE, - transformationId: trMetadata.transformationId, - workspaceId: trMetadata.workspaceId, - team: 'data-management', - service: 'openfaas-fn', - customer: 'shared', - 'customer-tier': CUSTOMER_TIER, - }; - if ( - trMetadata.workspaceId && - customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId) - ) { - labels['custom-network-policy'] = 'true'; + const payload = buildOpenfaasFn( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); + await updateFunction(functionName, payload); + // wait for function to be ready and then set it in cache + await awaitFunctionReadiness(functionName); + setFunctionInCache(functionName); + } catch (error) { + // 404 is statuscode returned from openfaas community edition + // when the function don't exist, so we can safely ignore this error + // and let the function be created in the next step. + if (error.statusCode !== 404) { + throw error; } + } +}; - // TODO: investigate and add more required labels and annotations - const payload = { - service: functionName, - name: functionName, - image: FAAS_BASE_IMG, - envProcess, - envVars, - labels, - annotations: { - 'prometheus.io.scrape': 'true', - }, - limits: { - memory: FAAS_LIMITS_MEMORY, - cpu: FAAS_LIMITS_CPU, - }, - requests: { - memory: FAAS_REQUESTS_MEMORY, - cpu: FAAS_REQUESTS_CPU, - }, - }; +const deployFaasFunction = async ( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata = {}, +) => { + try { + logger.debug(`Deploying faas fn: ${functionName}`); + const payload = buildOpenfaasFn( + functionName, + code, + versionId, + libraryVersionIDs, + testMode, + trMetadata, + ); await deployFunction(payload); - logger.debug('[Faas] Deployed a faas function'); } catch (error) { logger.error(`[Faas] Error while deploying ${functionName}: ${error.message}`); // To handle concurrent create requests, @@ -246,6 +227,95 @@ async function setupFaasFunction( } } +// reconcileFn runs everytime the service boot's up +// trying to update the functions which are not in cache to the +// latest label and envVars +const reconcileFn = async (name, versionId, libraryVersionIDs, trMetadata) => { + if (DISABLE_RECONCILE_FN) { + return; + } + + logger.debug(`Reconciling faas function: ${name}`); + try { + if (isFunctionDeployed(name)) { + return; + } + await updateFaasFunction(name, null, versionId, libraryVersionIDs, false, trMetadata); + } catch (error) { + logger.error( + `unexpected error occurred when reconciling the function ${name}: ${error.message}`, + ); + throw error; + } +}; + +// buildOpenfaasFn is helper function to build openfaas fn CRUD payload +function buildOpenfaasFn(name, code, versionId, libraryVersionIDs, testMode, trMetadata = {}) { + logger.debug(`Building faas fn: ${name}`); + + let envProcess = 'python index.py'; + const lvidsString = libraryVersionIDs.join(','); + + if (!testMode) { + envProcess = `${envProcess} --vid ${versionId} --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; + } else { + envProcess = `${envProcess} --code "${code}" --config-backend-url ${CONFIG_BACKEND_URL} --lvids "${lvidsString}"`; + } + + const envVars = {}; + + if (FAAS_ENABLE_WATCHDOG_ENV_VARS.trim().toLowerCase() === 'true') { + envVars.max_inflight = FAAS_MAX_INFLIGHT; + envVars.exec_timeout = FAAS_EXEC_TIMEOUT; + } + + if (GEOLOCATION_URL) { + envVars.geolocation_url = GEOLOCATION_URL; + } + + const labels = { + 'openfaas-fn': 'true', + 'parent-component': 'openfaas', + 'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT, + 'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT, + 'com.openfaas.scale.zero': FAAS_SCALE_ZERO, + 'com.openfaas.scale.zero-duration': FAAS_SCALE_ZERO_DURATION, + 'com.openfaas.scale.target': FAAS_SCALE_TARGET, + 'com.openfaas.scale.target-proportion': FAAS_SCALE_TARGET_PROPORTION, + 'com.openfaas.scale.type': FAAS_SCALE_TYPE, + transformationId: trMetadata.transformationId, + workspaceId: trMetadata.workspaceId, + team: 'data-management', + service: 'openfaas-fn', + customer: 'shared', + 'customer-tier': CUSTOMER_TIER, + }; + + if (trMetadata.workspaceId && customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)) { + labels['custom-network-policy'] = 'true'; + } + + return { + service: name, + name: name, + image: FAAS_BASE_IMG, + envProcess, + envVars, + labels, + annotations: { + 'prometheus.io.scrape': 'true', + }, + limits: { + memory: FAAS_LIMITS_MEMORY, + cpu: FAAS_LIMITS_CPU, + }, + requests: { + memory: FAAS_REQUESTS_MEMORY, + cpu: FAAS_REQUESTS_CPU, + }, + }; +} + const executeFaasFunction = async ( name, events, @@ -260,7 +330,11 @@ const executeFaasFunction = async ( let errorRaised; try { - if (testMode) await awaitFunctionReadiness(name); + if (testMode) { + await awaitFunctionReadiness(name); + } else { + await reconcileFn(name, versionId, libraryVersionIDs, trMetadata); + } return await invokeFunction(name, events); } catch (error) { logger.error(`Error while invoking ${name}: ${error.message}`); @@ -268,6 +342,7 @@ const executeFaasFunction = async ( if (error.statusCode === 404 && error.message.includes(`error finding function ${name}`)) { removeFunctionFromCache(name); + await setupFaasFunction(name, null, versionId, libraryVersionIDs, testMode, trMetadata); throw new RetryRequestError(`${name} not found`); } @@ -305,6 +380,7 @@ const executeFaasFunction = async ( stats.counter('user_transform_function_input_events', events.length, tags); stats.timing('user_transform_function_latency', startTime, tags); + stats.timingSummary('user_transform_function_latency_summary', startTime, tags); } }; @@ -313,6 +389,8 @@ module.exports = { executeFaasFunction, setupFaasFunction, invalidateFnCache, + buildOpenfaasFn, FAAS_AST_VID, FAAS_AST_FN_NAME, + setFunctionInCache, }; diff --git a/src/util/prometheus.js b/src/util/prometheus.js index a46eae12c98..bc4c6f2eb9e 100644 --- a/src/util/prometheus.js +++ b/src/util/prometheus.js @@ -11,7 +11,7 @@ function appendPrefix(name) { } class Prometheus { - constructor() { + constructor(enableSummaryMetrics = true) { this.prometheusRegistry = new prometheusClient.Registry(); this.prometheusRegistry.setDefaultLabels(defaultLabels); prometheusClient.collectDefaultMetrics({ @@ -21,7 +21,7 @@ class Prometheus { prometheusClient.AggregatorRegistry.setRegistries(this.prometheusRegistry); this.aggregatorRegistry = new prometheusClient.AggregatorRegistry(); - this.createMetrics(); + this.createMetrics(enableSummaryMetrics); } async metricsController(ctx) { @@ -56,11 +56,22 @@ class Prometheus { return gauge; } - newSummaryStat(name, help, labelNames) { + newSummaryStat( + name, + help, + labelNames, + percentiles = [0.5, 0.9, 0.99], + maxAgeSeconds = 300, + ageBuckets = 5, + ) { + // we enable a 5 minute sliding window and calculate the 50th, 90th, and 99th percentiles by default const summary = new prometheusClient.Summary({ name, help, labelNames, + percentiles, + maxAgeSeconds, + ageBuckets, }); this.prometheusRegistry.registerMetric(summary); return summary; @@ -117,6 +128,21 @@ class Prometheus { } } + timingSummary(name, start, tags = {}) { + try { + let metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name)); + if (!metric) { + logger.warn( + `Prometheus: summary metric ${name} not found in the registry. Creating a new one`, + ); + metric = this.newSummaryStat(name, name, Object.keys(tags)); + } + metric.observe(tags, (new Date() - start) / 1000); + } catch (e) { + logger.error(`Prometheus: Summary metric ${name} failed with error ${e}`); + } + } + histogram(name, value, tags = {}) { try { let metric = this.prometheusRegistry.getSingleMetric(appendPrefix(name)); @@ -166,7 +192,7 @@ class Prometheus { } } - createMetrics() { + createMetrics(enableSummaryMetrics) { const metrics = [ // Counters { @@ -575,6 +601,12 @@ class Prometheus { type: 'gauge', labelNames: ['destination_id'], }, + { + name: 'braze_alias_failure_count', + help: 'braze_alias_failure_count', + type: 'counter', + labelNames: ['destination_id'], + }, { name: 'mixpanel_batch_engage_pack_size', help: 'mixpanel_batch_engage_pack_size', @@ -692,6 +724,18 @@ class Prometheus { 'k8_namespace', ], }, + { + name: 'user_transform_request_latency_summary', + help: 'user_transform_request_latency_summary', + type: 'summary', + labelNames: [ + 'workspaceId', + 'transformationId', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, { name: 'user_transform_batch_size', help: 'user_transform_batch_size', @@ -708,6 +752,18 @@ class Prometheus { 524288000, ], // 1KB, 100KB, 0.5MB, 1MB, 10MB, 20MB, 50MB, 100MB, 200MB, 500MB }, + { + name: 'user_transform_batch_size_summary', + help: 'user_transform_batch_size_summary', + type: 'summary', + labelNames: [ + 'workspaceId', + 'transformationId', + 'sourceType', + 'destinationType', + 'k8_namespace', + ], + }, { name: 'source_transform_request_latency', help: 'source_transform_request_latency', @@ -764,12 +820,24 @@ class Prometheus { type: 'histogram', labelNames: ['versionId', 'version'], }, + { + name: 'get_transformation_code_time_summary', + help: 'get_transformation_code_time_summary', + type: 'summary', + labelNames: ['versionId', 'version'], + }, { name: 'get_libraries_code_time', help: 'get_libraries_code_time', type: 'histogram', labelNames: ['libraryVersionId', 'versionId', 'type', 'version'], }, + { + name: 'get_libraries_code_time_summary', + help: 'get_libraries_code_time_summary', + type: 'summary', + labelNames: ['libraryVersionId', 'versionId', 'type', 'version'], + }, { name: 'isolate_cpu_time', help: 'isolate_cpu_time', @@ -1021,6 +1089,22 @@ class Prometheus { 'workspaceId', ], }, + { + name: 'user_transform_function_latency_summary', + help: 'user_transform_function_latency_summary', + type: 'summary', + labelNames: [ + 'identifier', + 'testMode', + 'sourceType', + 'destinationType', + 'k8_namespace', + 'errored', + 'statusCode', + 'transformationId', + 'workspaceId', + ], + }, ]; metrics.forEach((metric) => { @@ -1036,6 +1120,17 @@ class Prometheus { metric.labelNames, metric.buckets, ); + } else if (metric.type === 'summary') { + if (enableSummaryMetrics) { + this.newSummaryStat( + appendPrefix(metric.name), + metric.help, + metric.labelNames, + metric.percentiles, + metric.maxAge, + metric.ageBuckets, + ); + } } else { logger.error( `Prometheus: Metric creation failed. Name: ${metric.name}. Invalid type: ${metric.type}`, diff --git a/src/util/stats.js b/src/util/stats.js index 9a32fd1de3b..0aa13fc85cc 100644 --- a/src/util/stats.js +++ b/src/util/stats.js @@ -4,6 +4,8 @@ const logger = require('../logger'); const enableStats = process.env.ENABLE_STATS !== 'false'; const statsClientType = process.env.STATS_CLIENT || 'statsd'; +// summary metrics are enabled by default. To disable set ENABLE_SUMMARY_METRICS='false'. +const enableSummaryMetrics = process.env.ENABLE_SUMMARY_METRICS !== 'false'; let statsClient; function init() { @@ -19,7 +21,7 @@ function init() { case 'prometheus': logger.info('setting up prometheus client'); - statsClient = new prometheus.Prometheus(); + statsClient = new prometheus.Prometheus(enableSummaryMetrics); break; default: @@ -38,6 +40,15 @@ const timing = (name, start, tags = {}) => { statsClient.timing(name, start, tags); }; +// timingSummary is used to record observations for a summary metric +const timingSummary = (name, start, tags = {}) => { + if (!enableStats || !statsClient || !enableSummaryMetrics) { + return; + } + + statsClient.timingSummary(name, start, tags); +}; + const increment = (name, tags = {}) => { if (!enableStats || !statsClient) { return; @@ -88,4 +99,13 @@ async function metricsController(ctx) { init(); -module.exports = { init, timing, increment, counter, gauge, histogram, metricsController }; +module.exports = { + init, + timing, + timingSummary, + increment, + counter, + gauge, + histogram, + metricsController, +}; diff --git a/src/util/statsd.js b/src/util/statsd.js index a32a6f6f302..7613de7975d 100644 --- a/src/util/statsd.js +++ b/src/util/statsd.js @@ -21,6 +21,11 @@ class Statsd { this.statsdClient.timing(name, start, tags); } + // timingSummary is just a wrapper around timing for statsd.For prometheus, we will have to implement a different function. + timingSummary(name, start, tags = {}) { + this.statsdClient.timing(name, start, tags); + } + increment(name, tags = {}) { this.statsdClient.increment(name, 1, tags); } diff --git a/src/v0/destinations/af/config.js b/src/v0/destinations/af/config.js index bc49706959f..eb3b191950e 100644 --- a/src/v0/destinations/af/config.js +++ b/src/v0/destinations/af/config.js @@ -57,6 +57,7 @@ const Event = { }; const ENDPOINT = 'https://api2.appsflyer.com/inappevent/'; +const ENDPOINT_V2 = 'https://api3.appsflyer.com/inappevent/'; const mappingConfig = getMappingConfig(ConfigCategory, __dirname); @@ -69,6 +70,7 @@ events.forEach((event) => { module.exports = { ConfigCategory, ENDPOINT, + ENDPOINT_V2, Event, mappingConfig, nameToEventMap, diff --git a/src/v0/destinations/af/transform.js b/src/v0/destinations/af/transform.js index 72ba47a2273..a611dcc249c 100644 --- a/src/v0/destinations/af/transform.js +++ b/src/v0/destinations/af/transform.js @@ -17,20 +17,32 @@ const { simpleProcessRouterDest, } = require('../../util'); -const { Event, ENDPOINT, ConfigCategory, mappingConfig, nameToEventMap } = require('./config'); +const { + Event, + ENDPOINT, + ENDPOINT_V2, + ConfigCategory, + mappingConfig, + nameToEventMap, +} = require('./config'); const { JSON_MIME_TYPE } = require('../../util/constant'); function responseBuilderSimple(payload, message, destination) { - const { androidAppId, appleAppId } = destination.Config; + const { androidAppId, appleAppId, sharingFilter, devKey, s2sKey, authVersion } = + destination.Config; let endpoint; const os = get(message, 'context.os.name'); // if ((os && os.toLowerCase() === "android") || (os && isAppleFamily(os))){ // if() // } + + const finalEndPoint = + isDefinedAndNotNull(authVersion) && authVersion === 'v2' ? ENDPOINT_V2 : ENDPOINT; + if (os && os.toLowerCase() === 'android' && androidAppId) { - endpoint = `${ENDPOINT}${androidAppId}`; + endpoint = `${finalEndPoint}${androidAppId}`; } else if (os && isAppleFamily(os) && appleAppId) { - endpoint = `${ENDPOINT}id${appleAppId}`; + endpoint = `${finalEndPoint}id${appleAppId}`; } else { throw new ConfigurationError( 'os name is required along with the respective appId eg. (os->android & Android App Id is required) or (os->ios & Apple App Id is required)', @@ -87,16 +99,19 @@ function responseBuilderSimple(payload, message, destination) { updatedPayload.bundleIdentifier = bundleIdentifier; } - const { sharingFilter, devKey } = destination.Config; + // const { sharingFilter, devKey } = destination.Config; if (isDefinedAndNotNullAndNotEmpty(sharingFilter)) { updatedPayload.sharing_filter = sharingFilter; } + const finalAuthentication = + isDefinedAndNotNull(authVersion) && authVersion === 'v2' ? s2sKey : devKey; + const response = defaultRequestConfig(); response.endpoint = endpoint; response.headers = { 'Content-Type': JSON_MIME_TYPE, - authentication: devKey, + authentication: finalAuthentication, }; response.method = defaultPostRequestConfig.requestMethod; response.body.JSON = removeUndefinedAndNullValues(updatedPayload); @@ -203,6 +218,19 @@ function processEventTypeTrack(message, config) { } function processSingleMessage(message, destination) { + const { devKey, s2sKey, authVersion, useRichEventName } = destination.Config; + + if (!isDefinedAndNotNull(authVersion) && !isDefinedAndNotNull(devKey)) { + throw new ConfigurationError('No authentication key is present. Aborting.'); + } + + if (isDefinedAndNotNull(authVersion) && authVersion === 'v2' && !isDefinedAndNotNull(s2sKey)) { + throw new ConfigurationError('s2s key is mandatory for v2 authorization. Aborting.'); + } + + if (isDefinedAndNotNull(authVersion) && authVersion === 'v1' && !isDefinedAndNotNull(devKey)) { + throw new ConfigurationError('dev key is mandatory for v1 authorization. Aborting.'); + } const messageType = message.type.toLowerCase(); let payload; switch (messageType) { @@ -212,7 +240,7 @@ function processSingleMessage(message, destination) { } case EventType.SCREEN: { let eventName; - if (destination.Config.useRichEventName === true) { + if (useRichEventName === true) { eventName = `Viewed ${ message.name || message.event || get(message, 'properties.name') || '' } Screen`; @@ -224,7 +252,7 @@ function processSingleMessage(message, destination) { } case EventType.PAGE: { let eventName; - if (destination.Config.useRichEventName === true) { + if (useRichEventName === true) { eventName = `Viewed ${message.name || get(message, 'properties.name') || ''} Page`; } else { eventName = EventType.PAGE; diff --git a/src/v0/destinations/braze/transform.js b/src/v0/destinations/braze/transform.js index d45640272ed..11b2bb06365 100644 --- a/src/v0/destinations/braze/transform.js +++ b/src/v0/destinations/braze/transform.js @@ -13,6 +13,7 @@ const { getPurchaseObjs, setExternalId, setAliasObjectWithAnonId, + collectStatsForAliasFailure, } = require('./util'); const tags = require('../../util/tags'); const { EventType, MappedToDestinationKey } = require('../../../constants'); @@ -228,6 +229,7 @@ async function processIdentify(message, destination) { endpointPath: '/users/identify', }, ); + if (!isHttpStatusSuccess(brazeIdentifyResp.status)) { throw new NetworkError( `Braze identify failed - ${JSON.stringify(brazeIdentifyResp.response)}`, @@ -238,6 +240,8 @@ async function processIdentify(message, destination) { brazeIdentifyResp.response, ); } + + collectStatsForAliasFailure(brazeIdentifyResp.response, destination.ID); } function processTrackWithUserAttributes( diff --git a/src/v0/destinations/braze/util.js b/src/v0/destinations/braze/util.js index ce83ebc244a..f131c40f5fa 100644 --- a/src/v0/destinations/braze/util.js +++ b/src/v0/destinations/braze/util.js @@ -1,6 +1,7 @@ /* eslint-disable */ const _ = require('lodash'); const get = require('get-value'); +const { structuredLogger: logger } = require('@rudderstack/integrations-lib'); const stats = require('../../../util/stats'); const { handleHttpRequest } = require('../../../adapters/network'); const { @@ -655,6 +656,47 @@ function getPurchaseObjs(message, config) { return purchaseObjs; } +const collectStatsForAliasFailure = (brazeResponse, destinationId) => { + /** + * Braze Response for Alias failure + * { + * "aliases_processed": 0, + * "message": "success", + * "errors": [ + * { + * "type": "'external_id' is required", + * "input_array": "user_identifiers", + * "index": 0 + * } + * ] + * } + */ + + /** + * Braze Response for Alias success + * { + * "aliases_processed": 1, + * "message": "success" + * } + */ + + // Should not happen but still checking for unhandled exceptions + if (!isDefinedAndNotNull(brazeResponse)) { + return; + } + const { aliases_processed: aliasesProcessed, errors } = brazeResponse; + if (aliasesProcessed === 0) { + stats.increment('braze_alias_failure_count', { destination_id: destinationId }); + + if (Array.isArray(errors)) { + logger.info('Braze Alias Failure Errors:', { + destinationId, + errors, + }); + } + } +}; + module.exports = { BrazeDedupUtility, CustomAttributeOperationUtil, @@ -667,4 +709,5 @@ module.exports = { setExternalId, setAliasObjectWithAnonId, addMandatoryPurchaseProperties, + collectStatsForAliasFailure, }; diff --git a/src/v0/destinations/fb_custom_audience/recordTransform.js b/src/v0/destinations/fb_custom_audience/recordTransform.js new file mode 100644 index 00000000000..0f7b65c0bf4 --- /dev/null +++ b/src/v0/destinations/fb_custom_audience/recordTransform.js @@ -0,0 +1,277 @@ +/* eslint-disable no-const-assign */ +const lodash = require('lodash'); +const get = require('get-value'); +const { + InstrumentationError, + ConfigurationError, + getErrorRespEvents, +} = require('@rudderstack/integrations-lib'); +const { schemaFields } = require('./config'); +const { MappedToDestinationKey } = require('../../../constants'); +const stats = require('../../../util/stats'); +const { + getDestinationExternalIDInfoForRetl, + isDefinedAndNotNullAndNotEmpty, + checkSubsetOfArray, + returnArrayOfSubarrays, + getSuccessRespEvents, + generateErrorObject, +} = require('../../util'); +const { + ensureApplicableFormat, + getUpdatedDataElement, + getSchemaForEventMappedToDest, + batchingWithPayloadSize, + responseBuilderSimple, + getDataSource, +} = require('./util'); + +function getErrorMetaData(inputs, acceptedOperations) { + const metadata = []; + // eslint-disable-next-line no-restricted-syntax + for (const key in inputs) { + if (!acceptedOperations.includes(key)) { + inputs[key].forEach((input) => { + metadata.push(input.metadata); + }); + } + } + return metadata; +} + +const processRecordEventArray = ( + recordChunksArray, + userSchema, + isHashRequired, + disableFormat, + paramsPayload, + prepareParams, + destination, + operation, + operationAudienceId, +) => { + const toSendEvents = []; + const metadata = []; + recordChunksArray.forEach((recordArray) => { + const data = []; + recordArray.forEach((input) => { + const { fields } = input.message; + let dataElement = []; + let nullUserData = true; + + userSchema.forEach((eachProperty) => { + const userProperty = fields[eachProperty]; + let updatedProperty = userProperty; + + if (isHashRequired && !disableFormat) { + updatedProperty = ensureApplicableFormat(eachProperty, userProperty); + } + + dataElement = getUpdatedDataElement( + dataElement, + isHashRequired, + eachProperty, + updatedProperty, + ); + + if (dataElement[dataElement.length - 1]) { + nullUserData = false; + } + }); + + if (nullUserData) { + stats.increment('fb_custom_audience_event_having_all_null_field_values_for_a_user', { + destinationId: destination.ID, + nullFields: userSchema, + }); + } + data.push(dataElement); + metadata.push(input.metadata); + }); + + const prepareFinalPayload = lodash.cloneDeep(paramsPayload); + prepareFinalPayload.schema = userSchema; + prepareFinalPayload.data = data; + const payloadBatches = batchingWithPayloadSize(prepareFinalPayload); + + payloadBatches.forEach((payloadBatch) => { + const response = { + ...prepareParams, + payload: payloadBatch, + }; + + const wrappedResponse = { + responseField: response, + operationCategory: operation, + }; + + const builtResponse = responseBuilderSimple(wrappedResponse, operationAudienceId); + + toSendEvents.push(builtResponse); + }); + }); + + const response = getSuccessRespEvents(toSendEvents, metadata, destination, true); + + return response; +}; + +async function processRecordInputs(groupedRecordInputs) { + const { destination } = groupedRecordInputs[0]; + const { message } = groupedRecordInputs[0]; + const { + isHashRequired, + accessToken, + disableFormat, + type, + subType, + isRaw, + maxUserCount, + audienceId, + } = destination.Config; + const prepareParams = { + access_token: accessToken, + }; + + // maxUserCount validation + const maxUserCountNumber = parseInt(maxUserCount, 10); + if (Number.isNaN(maxUserCountNumber)) { + throw new ConfigurationError('Batch size must be an Integer.'); + } + + // audience id validation + let operationAudienceId = audienceId; + const mappedToDestination = get(message, MappedToDestinationKey); + if (mappedToDestination) { + const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE'); + operationAudienceId = objectType; + } + if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) { + throw new ConfigurationError('Audience ID is a mandatory field'); + } + + // user schema validation + let { userSchema } = destination.Config; + if (mappedToDestination) { + userSchema = getSchemaForEventMappedToDest(message); + } + if (!Array.isArray(userSchema)) { + userSchema = [userSchema]; + } + if (!checkSubsetOfArray(schemaFields, userSchema)) { + throw new ConfigurationError('One or more of the schema fields are not supported'); + } + + const paramsPayload = {}; + + if (isRaw) { + paramsPayload.is_raw = isRaw; + } + + const dataSource = getDataSource(type, subType); + if (Object.keys(dataSource).length > 0) { + paramsPayload.data_source = dataSource; + } + + const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) => + record.message.action?.toLowerCase(), + ); + + const finalResponse = []; + + let insertResponse; + let deleteResponse; + let updateResponse; + + if (groupedRecordsByAction.delete) { + const deleteRecordChunksArray = returnArrayOfSubarrays( + groupedRecordsByAction.delete, + maxUserCountNumber, + ); + deleteResponse = processRecordEventArray( + deleteRecordChunksArray, + userSchema, + isHashRequired, + disableFormat, + paramsPayload, + prepareParams, + destination, + 'remove', + operationAudienceId, + ); + } + + if (groupedRecordsByAction.insert) { + const insertRecordChunksArray = returnArrayOfSubarrays( + groupedRecordsByAction.insert, + maxUserCountNumber, + ); + + insertResponse = processRecordEventArray( + insertRecordChunksArray, + userSchema, + isHashRequired, + disableFormat, + paramsPayload, + prepareParams, + destination, + 'add', + operationAudienceId, + ); + } + + if (groupedRecordsByAction.update) { + const updateRecordChunksArray = returnArrayOfSubarrays( + groupedRecordsByAction.update, + maxUserCountNumber, + ); + updateResponse = processRecordEventArray( + updateRecordChunksArray, + userSchema, + isHashRequired, + disableFormat, + paramsPayload, + prepareParams, + destination, + 'add', + operationAudienceId, + ); + } + + const eventTypes = ['update', 'insert', 'delete']; + const errorMetaData = []; + const errorMetaDataObject = getErrorMetaData(groupedRecordsByAction, eventTypes); + if (errorMetaDataObject.length > 0) { + errorMetaData.push(errorMetaDataObject); + } + + const error = new InstrumentationError('Invalid action type in record event'); + const errorObj = generateErrorObject(error); + const errorResponseList = errorMetaData.map((metadata) => + getErrorRespEvents(metadata, errorObj.status, errorObj.message, errorObj.statTags), + ); + + if (deleteResponse && deleteResponse.batchedRequest.length > 0) { + finalResponse.push(deleteResponse); + } + if (insertResponse && insertResponse.batchedRequest.length > 0) { + finalResponse.push(insertResponse); + } + if (updateResponse && updateResponse.batchedRequest.length > 0) { + finalResponse.push(updateResponse); + } + if (errorResponseList.length > 0) { + finalResponse.push(...errorResponseList); + } + + if (finalResponse.length === 0) { + throw new InstrumentationError( + 'Missing valid parameters, unable to generate transformed payload', + ); + } + return finalResponse; +} + +module.exports = { + processRecordInputs, +}; diff --git a/src/v0/destinations/fb_custom_audience/transform.js b/src/v0/destinations/fb_custom_audience/transform.js index dfe9a04618b..c5c340c043d 100644 --- a/src/v0/destinations/fb_custom_audience/transform.js +++ b/src/v0/destinations/fb_custom_audience/transform.js @@ -1,14 +1,7 @@ const lodash = require('lodash'); const get = require('get-value'); +const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); const { - InstrumentationError, - TransformationError, - ConfigurationError, -} = require('@rudderstack/integrations-lib'); -const { - defaultRequestConfig, - defaultPostRequestConfig, - defaultDeleteRequestConfig, checkSubsetOfArray, isDefinedAndNotNullAndNotEmpty, returnArrayOfSubarrays, @@ -21,40 +14,27 @@ const { getSchemaForEventMappedToDest, batchingWithPayloadSize, generateAppSecretProof, + responseBuilderSimple, + getDataSource, } = require('./util'); -const { - getEndPoint, - schemaFields, - USER_ADD, - USER_DELETE, - typeFields, - subTypeFields, -} = require('./config'); +const { schemaFields, USER_ADD, USER_DELETE } = require('./config'); const { MappedToDestinationKey } = require('../../../constants'); +const { processRecordInputs } = require('./recordTransform'); +const logger = require('../../../logger'); -const responseBuilderSimple = (payload, audienceId) => { - if (payload) { - const responseParams = payload.responseField; - const response = defaultRequestConfig(); - response.endpoint = getEndPoint(audienceId); - - if (payload.operationCategory === 'add') { - response.method = defaultPostRequestConfig.requestMethod; +function checkForUnsupportedEventTypes(dictionary, keyList) { + const unsupportedEventTypes = []; + // eslint-disable-next-line no-restricted-syntax + for (const key in dictionary) { + if (!keyList.includes(key)) { + unsupportedEventTypes.push(key); } - if (payload.operationCategory === 'remove') { - response.method = defaultDeleteRequestConfig.requestMethod; - } - - response.params = responseParams; - return response; } - // fail-safety for developer error - throw new TransformationError(`Payload could not be constructed`); -}; + return unsupportedEventTypes; +} // Function responsible prepare the payload field of every event parameter - const preparePayload = ( userUpdateList, userSchema, @@ -102,7 +82,6 @@ const prepareResponse = ( const prepareParams = {}; // creating the parameters field const paramsPayload = {}; - const dataSource = {}; prepareParams.access_token = accessToken; @@ -118,13 +97,7 @@ const prepareResponse = ( } // creating the data_source block - if (type && type !== 'NA' && typeFields.includes(type)) { - dataSource.type = type; - } - - if (subType && subType !== 'NA' && subTypeFields.includes(subType)) { - dataSource.sub_type = subType; - } + const dataSource = getDataSource(type, subType); if (Object.keys(dataSource).length > 0) { paramsPayload.data_source = dataSource; } @@ -250,6 +223,7 @@ const processEvent = (message, destination) => { ), ); } + toSendEvents.forEach((sendEvent) => { respList.push(responseBuilderSimple(sendEvent, operationAudienceId)); }); @@ -265,7 +239,31 @@ const processEvent = (message, destination) => { const process = (event) => processEvent(event.message, event.destination); const processRouterDest = async (inputs, reqMetadata) => { - const respList = await simpleProcessRouterDest(inputs, process, reqMetadata); + const respList = []; + const groupedInputs = lodash.groupBy(inputs, (input) => input.message.type?.toLowerCase()); + let transformedRecordEvent = []; + let transformedAudienceEvent = []; + + const eventTypes = ['record', 'audiencelist']; + const unsupportedEventList = checkForUnsupportedEventTypes(groupedInputs, eventTypes); + if (unsupportedEventList.length > 0) { + logger.info(`unsupported events found ${unsupportedEventList}`); + throw new ConfigurationError('unsupported events present in the event'); + } + + if (groupedInputs.record) { + transformedRecordEvent = await processRecordInputs(groupedInputs.record, reqMetadata); + } + + if (groupedInputs.audiencelist) { + transformedAudienceEvent = await simpleProcessRouterDest( + groupedInputs.audiencelist, + process, + reqMetadata, + ); + } + + respList.push(...transformedRecordEvent, ...transformedAudienceEvent); return flattenMap(respList); }; diff --git a/src/v0/destinations/fb_custom_audience/util.js b/src/v0/destinations/fb_custom_audience/util.js index 9385dfbd367..401b6018697 100644 --- a/src/v0/destinations/fb_custom_audience/util.js +++ b/src/v0/destinations/fb_custom_audience/util.js @@ -4,6 +4,13 @@ const crypto = require('crypto'); const get = require('get-value'); const jsonSize = require('json-size'); const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib'); +const { TransformationError } = require('@rudderstack/integrations-lib'); +const { typeFields, subTypeFields, getEndPoint } = require('./config'); +const { + defaultRequestConfig, + defaultPostRequestConfig, + defaultDeleteRequestConfig, +} = require('../../util'); const stats = require('../../../util/stats'); const { isDefinedAndNotNull } = require('../../util'); @@ -208,7 +215,6 @@ const prepareDataField = ( }; // ref: https://developers.facebook.com/docs/facebook-login/security/#generate-the-proof - const generateAppSecretProof = (accessToken, appSecret, dateNow) => { const currentTime = Math.floor(dateNow / 1000); // Get current Unix time in seconds const data = `${accessToken}|${currentTime}`; @@ -221,9 +227,44 @@ const generateAppSecretProof = (accessToken, appSecret, dateNow) => { return appsecretProof; }; +const getDataSource = (type, subType) => { + const dataSource = {}; + if (type && type !== 'NA' && typeFields.includes(type)) { + dataSource.type = type; + } + if (subType && subType !== 'NA' && subTypeFields.includes(subType)) { + dataSource.sub_type = subType; + } + return dataSource; +}; + +const responseBuilderSimple = (payload, audienceId) => { + if (payload) { + const responseParams = payload.responseField; + const response = defaultRequestConfig(); + response.endpoint = getEndPoint(audienceId); + + if (payload.operationCategory === 'add') { + response.method = defaultPostRequestConfig.requestMethod; + } + if (payload.operationCategory === 'remove') { + response.method = defaultDeleteRequestConfig.requestMethod; + } + + response.params = responseParams; + return response; + } + // fail-safety for developer error + throw new TransformationError(`Payload could not be constructed`); +}; + module.exports = { prepareDataField, getSchemaForEventMappedToDest, batchingWithPayloadSize, + ensureApplicableFormat, + getUpdatedDataElement, generateAppSecretProof, + responseBuilderSimple, + getDataSource, }; diff --git a/src/v0/destinations/fb_custom_audience/util.test.js b/src/v0/destinations/fb_custom_audience/util.test.js new file mode 100644 index 00000000000..60e0aff464d --- /dev/null +++ b/src/v0/destinations/fb_custom_audience/util.test.js @@ -0,0 +1,122 @@ +const { getDataSource, responseBuilderSimple, getUpdatedDataElement } = require('./util'); + +const basePayload = { + responseField: { + access_token: 'ABC', + payload: { + schema: ['EMAIL', 'FI'], + data: [ + [ + 'b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36', + '1e14a2f476f7611a8b22bc85d14237fdc88aac828737e739416c32c5bce3bd16', + ], + ], + }, + }, +}; + +const baseResponse = { + version: '1', + type: 'REST', + endpoint: 'https://graph.facebook.com/v18.0/23848494844100489/users', + headers: {}, + params: { + access_token: 'ABC', + payload: { + schema: ['EMAIL', 'FI'], + data: [ + [ + 'b100c2ec0718fe6b4805b623aeec6710719d042ceea55f5c8135b010ec1c7b36', + '1e14a2f476f7611a8b22bc85d14237fdc88aac828737e739416c32c5bce3bd16', + ], + ], + }, + }, + body: { + JSON: {}, + JSON_ARRAY: {}, + XML: {}, + FORM: {}, + }, + files: {}, +}; + +describe('FB_custom_audience utils test', () => { + describe('getDataSource function tests', () => { + it('Should return empty datasource if type and subType are both NA', () => { + const expectedDataSource = {}; + const dataSource = getDataSource('NA', 'NA'); + expect(dataSource).toEqual(expectedDataSource); + }); + it('Should set subType and type if value present in destination config macthes with preset list', () => { + const expectedDataSource = { + type: 'EVENT_BASED', + }; + const dataSource = getDataSource('EVENT_BASED', 'something'); + expect(dataSource).toEqual(expectedDataSource); + }); + }); + + describe('responseBuilderSimple function tests', () => { + it('Should return correct response for add payload', () => { + const payload = basePayload; + payload.operationCategory = 'add'; + const expectedResponse = baseResponse; + expectedResponse.method = 'POST'; + const response = responseBuilderSimple(payload, '23848494844100489'); + expect(response).toEqual(expectedResponse); + }); + + it('Should return correct response for delete payload', () => { + const payload = basePayload; + payload.operationCategory = 'remove'; + const expectedResponse = baseResponse; + expectedResponse.method = 'DELETE'; + const response = responseBuilderSimple(payload, '23848494844100489'); + expect(response).toEqual(expectedResponse); + }); + + it('Should throw error if payload is empty', () => { + try { + const response = responseBuilderSimple(payload, ''); + expect(response).toEqual(); + } catch (error) { + expect(error.message).toEqual(`payload is not defined`); + } + }); + }); + + describe('getUpdatedDataElement function tests', () => { + it('Should hash field if isHashRequired is set to true', () => { + const expectedDataElement = [ + '59107c750fd5ee2758d1988f2bf12d9f110439221ebdb7997e70d6a2c1c5afda', + ]; + let dataElement = []; + dataElement = getUpdatedDataElement(dataElement, true, 'FN', 'some-name'); + expect(dataElement).toEqual(expectedDataElement); + }); + + it('Should not hash field if isHashRequired is set to false', () => { + const expectedDataElement = ['some-name']; + let dataElement = []; + dataElement = getUpdatedDataElement(dataElement, false, 'FN', 'some-name'); + expect(dataElement).toEqual(expectedDataElement); + }); + + it('Should not hash MADID or EXTERN_ID and just pass value', () => { + const expectedDataElement = ['some-id', 'some-ext-id']; + let dataElement = []; + dataElement = getUpdatedDataElement(dataElement, true, 'MADID', 'some-id'); + dataElement = getUpdatedDataElement(dataElement, true, 'EXTERN_ID', 'some-ext-id'); + expect(dataElement).toEqual(expectedDataElement); + }); + + it('Should not hash MADID or EXTERN_ID and just pass empty value if value does not exist', () => { + const expectedDataElement = ['', '']; + let dataElement = []; + dataElement = getUpdatedDataElement(dataElement, true, 'MADID', ''); + dataElement = getUpdatedDataElement(dataElement, true, 'EXTERN_ID', ''); + expect(dataElement).toEqual(expectedDataElement); + }); + }); +}); diff --git a/src/v0/destinations/mp/transform.js b/src/v0/destinations/mp/transform.js index 2065764b989..02eca8ed22f 100644 --- a/src/v0/destinations/mp/transform.js +++ b/src/v0/destinations/mp/transform.js @@ -386,7 +386,7 @@ const processGroupEvents = (message, type, destination) => { $set: { [groupKey]: groupKeyVal, }, - $ip: get(message, 'context.ip'), + $ip: get(message, 'context.ip') || message.request_ip, }; if (destination?.Config.identityMergeApi === 'simplified') { payload.$distinct_id = message.userId || `$device:${message.anonymousId}`; diff --git a/src/v0/destinations/splitio/data/EventConfig.json b/src/v0/destinations/splitio/data/EventConfig.json index d41398a5e2c..64354583682 100644 --- a/src/v0/destinations/splitio/data/EventConfig.json +++ b/src/v0/destinations/splitio/data/EventConfig.json @@ -29,5 +29,17 @@ "type": "toFloat" }, "required": false + }, + { + "destKey": "trafficTypeName", + "sourceKeys": [ + "traits.trafficTypeName", + "context.traits.trafficTypeName", + "properties.trafficTypeName" + ], + "metadata": { + "type": "toString" + }, + "required": false } ] diff --git a/src/v0/destinations/splitio/transform.js b/src/v0/destinations/splitio/transform.js index 6641a62fe8d..3dd5bea4f2f 100644 --- a/src/v0/destinations/splitio/transform.js +++ b/src/v0/destinations/splitio/transform.js @@ -56,6 +56,7 @@ function prepareResponse(message, destination, category) { let outputPayload = {}; + // ref: https://docs.split.io/reference/events-overview outputPayload = constructPayload(message, MAPPING_CONFIG[category.name]); outputPayload.eventTypeId = outputPayload.eventTypeId.replace(/ /g, '_'); if (EVENT_TYPE_ID_REGEX.test(outputPayload.eventTypeId)) { @@ -93,7 +94,12 @@ function prepareResponse(message, destination, category) { if (isDefinedAndNotNullAndNotEmpty(environment)) { outputPayload.environmentName = environment; } - outputPayload.trafficTypeName = trafficType; + + // in case traffic type could not be mapped from the input payloads, falls back to the UI configured default traffic type. + if (!isDefinedAndNotNullAndNotEmpty(outputPayload.trafficTypeName)) { + outputPayload.trafficTypeName = trafficType; + } + outputPayload.properties = removeUndefinedNullValuesAndEmptyObjectArray( flattenJson(bufferProperty), ); diff --git a/src/v0/destinations/tiktok_ads/transformV2.js b/src/v0/destinations/tiktok_ads/transformV2.js index 4624ec91819..8760dee52cf 100644 --- a/src/v0/destinations/tiktok_ads/transformV2.js +++ b/src/v0/destinations/tiktok_ads/transformV2.js @@ -42,6 +42,8 @@ const getTrackResponsePayload = (message, destConfig, event, setDefaultForConten // if contents is not present but we have properties.products present which has fields with superset of contents fields if (!payload.properties?.contents && message.properties?.products) { // retreiving data from products only when contents is not present + // properties object may be empty due which next line may give some error + payload.properties = payload.properties || {}; payload.properties.contents = getContents(message, false); } @@ -55,6 +57,8 @@ const getTrackResponsePayload = (message, destConfig, event, setDefaultForConten } // setting content-type default value in case of all standard event except `page-view` if (!payload.properties?.content_type && setDefaultForContentType) { + // properties object may be empty due which next line may give some error + payload.properties = payload.properties || {}; payload.properties.content_type = 'product'; } payload.event = event; diff --git a/src/v0/destinations/zendesk/transform.js b/src/v0/destinations/zendesk/transform.js index 58620147845..cadb1d3964f 100644 --- a/src/v0/destinations/zendesk/transform.js +++ b/src/v0/destinations/zendesk/transform.js @@ -4,6 +4,7 @@ const { NetworkInstrumentationError, InstrumentationError, NetworkError, + isDefinedAndNotNull, } = require('@rudderstack/integrations-lib'); const myAxios = require('../../../util/myAxios'); @@ -405,6 +406,9 @@ async function getUserMembershipPayload(message, headers, orgId, destinationConf } async function createOrganization(message, category, headers, destinationConfig, baseEndpoint) { + if (!isDefinedAndNotNull(message.traits)) { + throw new InstrumentationError('Organisation Traits are missing. Aborting.'); + } await checkAndCreateUserFields( message.traits, category.organizationFieldsEndpoint, diff --git a/src/v0/sources/adjust/config.ts b/src/v0/sources/adjust/config.ts new file mode 100644 index 00000000000..d1c6ab8242b --- /dev/null +++ b/src/v0/sources/adjust/config.ts @@ -0,0 +1,16 @@ +export const excludedFieldList = [ + 'activity_kind', + 'event', + 'event_name', + 'gps_adid', + 'idfa', + 'idfv', + 'adid', + 'tracker', + 'tracker_name', + 'app_name', + 'ip_address', + 'tracking_enabled', + 'tracker_token', + 'created_at', +]; diff --git a/src/v0/sources/adjust/mapping.json b/src/v0/sources/adjust/mapping.json new file mode 100644 index 00000000000..60ea66281e4 --- /dev/null +++ b/src/v0/sources/adjust/mapping.json @@ -0,0 +1,52 @@ +[ + { + "sourceKeys": "activity_kind", + "destKeys": "properties.activity_kind" + }, + { + "sourceKeys": "event", + "destKeys": "properties.event_token" + }, + { + "sourceKeys": "event_name", + "destKeys": "event" + }, + { + "sourceKeys": "gps_adid", + "destKeys": "properties.gps_adid" + }, + { + "sourceKeys": "idfa", + "destKeys": "context.device.advertisingId" + }, + { + "sourceKeys": "idfv", + "destKeys": "context.device.id" + }, + { + "sourceKeys": "adid", + "destKeys": "context.device.id " + }, + { + "sourceKeys": "tracker", + "destKeys": "properties.tracker" + }, + { + "sourceKeys": "tracker_name", + "destKeys": "properties.tracker_name" + }, + { "sourceKeys": "tracker_token", "destKeys": "properties.tracker_token" }, + + { + "sourceKeys": "app_name", + "destKeys": "context.app.name" + }, + { + "sourceKeys": "ip_address", + "destKeys": ["context.ip", "request_ip"] + }, + { + "sourceKeys": "tracking_enabled", + "destKeys": "properties.tracking_enabled" + } +] diff --git a/src/v0/sources/adjust/transform.js b/src/v0/sources/adjust/transform.js new file mode 100644 index 00000000000..8568622aeba --- /dev/null +++ b/src/v0/sources/adjust/transform.js @@ -0,0 +1,61 @@ +const lodash = require('lodash'); +const path = require('path'); +const fs = require('fs'); +const { TransformationError, structuredLogger: logger } = require('@rudderstack/integrations-lib'); +const Message = require('../message'); +const { CommonUtils } = require('../../../util/common'); +const { excludedFieldList } = require('./config'); +const { extractCustomFields, generateUUID } = require('../../util'); + +// ref : https://help.adjust.com/en/article/global-callbacks#general-recommended-placeholders +// import mapping json using JSON.parse to preserve object key order +const mapping = JSON.parse(fs.readFileSync(path.resolve(__dirname, './mapping.json'), 'utf-8')); + +const formatProperties = (input) => { + const { query_parameters: qParams } = input; + logger.debug(`[Adjust] Input event: query_params: ${JSON.stringify(qParams)}`); + if (!qParams) { + throw new TransformationError('Query_parameters is missing'); + } + const formattedOutput = {}; + Object.entries(qParams).forEach(([key, [value]]) => { + formattedOutput[key] = value; + }); + return formattedOutput; +}; + +const processEvent = (inputEvent) => { + const message = new Message(`Adjust`); + const event = lodash.cloneDeep(inputEvent); + const formattedPayload = formatProperties(event); + // event type is always track + const eventType = 'track'; + message.setEventType(eventType); + message.setPropertiesV2(formattedPayload, mapping); + let customProperties = {}; + customProperties = extractCustomFields( + formattedPayload, + customProperties, + 'root', + excludedFieldList, + ); + message.properties = { ...message.properties, ...customProperties }; + + if (formattedPayload.created_at) { + const ts = new Date(formattedPayload.created_at * 1000).toISOString(); + message.setProperty('originalTimestamp', ts); + message.setProperty('timestamp', ts); + } + + // adjust does not has the concept of user but we need to set some random anonymousId in order to make the server accept the message + message.anonymousId = generateUUID(); + return message; +}; + +// This fucntion just converts the incoming payload to array of already not and sends it to processEvent +const process = (events) => { + const eventsArray = CommonUtils.toArray(events); + return eventsArray.map(processEvent); +}; + +module.exports = { process }; diff --git a/src/v0/util/facebookUtils/index.js b/src/v0/util/facebookUtils/index.js index c7753d255fb..7462320ccad 100644 --- a/src/v0/util/facebookUtils/index.js +++ b/src/v0/util/facebookUtils/index.js @@ -292,7 +292,13 @@ const formingFinalResponse = ( throw new TransformationError('Payload could not be constructed'); }; +const isHtmlFormat = (string) => { + const htmlTags = /<(?!(!doctype\s*html|html))\b[^>]*>[\S\s]*?<\/[^>]*>/i; + return htmlTags.test(string); +}; + module.exports = { + isHtmlFormat, getContentType, getContentCategory, transformedPayloadData, diff --git a/src/v0/util/facebookUtils/index.test.js b/src/v0/util/facebookUtils/index.test.js index 20c4ee59f25..1a2de4ed129 100644 --- a/src/v0/util/facebookUtils/index.test.js +++ b/src/v0/util/facebookUtils/index.test.js @@ -3,6 +3,7 @@ const { fetchUserData, deduceFbcParam, getContentType, + isHtmlFormat, } = require('./index'); const sha256 = require('sha256'); const { MAPPING_CONFIG, CONFIG_CATEGORIES } = require('../../destinations/facebook_pixel/config'); @@ -639,3 +640,53 @@ describe('getContentType', () => { expect(result).toBe(defaultValue); }); }); + +describe('isHtmlFormat', () => { + it('should return false for Json', () => { + expect(isHtmlFormat('{"a": 1, "b":2}')).toBe(false); + }); + + it('should return false for empty Json', () => { + expect(isHtmlFormat('{}')).toBe(false); + }); + + it('should return false for undefined', () => { + expect(isHtmlFormat(undefined)).toBe(false); + }); + + it('should return false for null', () => { + expect(isHtmlFormat(null)).toBe(false); + }); + + it('should return false for empty array', () => { + expect(isHtmlFormat([])).toBe(false); + }); + + it('should return true for html doctype', () => { + expect( + isHtmlFormat( + '
', + ), + ).toBe(true); + }); + + it('should return true for html', () => { + expect( + isHtmlFormat( + '