Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): introduced new status codes to suppress or filter events #2611

Merged
merged 22 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2d2cb1c
feat(integrations): introduced new actions to suppress or filter events
mihir-4116 Sep 14, 2023
90f9e5e
feat(integrations): introduced new actions to suppress or filter events
mihir-4116 Sep 14, 2023
855fdce
chore: klaviyo multiple response action fix
mihir-4116 Sep 14, 2023
bb5fe08
Merge branch 'develop' into feat.new_actions
mihir-4116 Sep 15, 2023
cf0d5f0
Merge branch 'develop' into feat.new_actions
mihir-4116 Sep 15, 2023
cec8b8b
chore: statuscode changes
mihir-4116 Sep 25, 2023
14aa4e0
chore: introduce new statuscode for filter and suppress events
mihir-4116 Sep 25, 2023
0cf67fe
chore: pr conflicts resolved
mihir-4116 Sep 26, 2023
4624836
chore: code review changes
mihir-4116 Sep 26, 2023
8d9a1e7
chore: pr conflicts resolved
mihir-4116 Oct 3, 2023
99e0ad7
Merge branch 'develop' of github.com:rudderlabs/rudder-transformer in…
mihir-4116 Oct 4, 2023
726be1e
chore: added support of server<>transformer cross compatibility
mihir-4116 Oct 4, 2023
83d4c79
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 4, 2023
45dfd16
chore: code review changes
mihir-4116 Oct 5, 2023
b40add8
chore: added new status code tests for braze
mihir-4116 Oct 5, 2023
4cfc660
chore: added new tests for both transformer versions - with status co…
mihir-4116 Oct 5, 2023
97cba05
chore: code review changes
mihir-4116 Oct 6, 2023
f1734c2
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 6, 2023
cab085e
chore: code review changes
mihir-4116 Oct 6, 2023
fb1a810
chore: code review changes
mihir-4116 Oct 6, 2023
6b93613
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 6, 2023
0339c44
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/routes/destination.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Router from '@koa/router';
import DestinationController from '../controllers/destination';
import RegulationController from '../controllers/regulation';
import FeatureFlagController from '../middlewares/featureFlag';
import RouteActivationController from '../middlewares/routeActivation';

const router = new Router();
Expand All @@ -9,22 +10,25 @@ router.post(
'/:version/destinations/:destination',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationProcFilter,
FeatureFlagController.handle,
DestinationController.destinationTransformAtProcessor,
);
router.post(
'/routerTransform',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationRtFilter,
FeatureFlagController.handle,
DestinationController.destinationTransformAtRouter,
);
router.post(
'/batch',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationBatchFilter,
FeatureFlagController.handle,
DestinationController.batchProcess,
);

router.post('/deleteUsers', RegulationController.deleteUsers);

const destinationRoutes = router.routes();
export default destinationRoutes;
export default destinationRoutes;
12 changes: 9 additions & 3 deletions src/services/destination/nativeIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
try {
const transformedPayloads:
| ProcessorTransformationOutput
| ProcessorTransformationOutput[] = await destHandler.process(event);
| ProcessorTransformationOutput[] = await destHandler.process(event, _requestMetadata);
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
return DestinationPostTransformationService.handleProcessorTransformSucessEvents(
event,
transformedPayloads,
Expand Down Expand Up @@ -106,7 +106,10 @@ export default class NativeIntegrationDestinationService implements IntegrationD
);
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await destHandler.processRouterDest(cloneDeep(destInputArray));
await destHandler.processRouterDest(
cloneDeep(destInputArray),
cloneDeep(_requestMetadata),
);
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
metaTO.metadata = destInputArray[0].metadata;
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
Expand Down Expand Up @@ -145,7 +148,10 @@ export default class NativeIntegrationDestinationService implements IntegrationD
const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents);
const response = groupedEvents.map((destEvents) => {
try {
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(destEvents);
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(
destEvents,
_requestMetadata,
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
);
return destBatchedRequests;
} catch (error: any) {
const metaTO = this.getTags(
Expand Down
1 change: 1 addition & 0 deletions src/services/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export default class MiscService {
return {
namespace: 'Unknown',
cluster: 'Unknown',
features: ctx?.state?.features || {},
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/util/errorNotifier/bugsnag.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
UnhandledStatusCodeError,
UnauthorizedError,
NetworkInstrumentationError,
FilteredEventsError,
} = require('../../v0/util/errorTypes');

const {
Expand All @@ -48,6 +49,7 @@ const errorTypesDenyList = [
NetworkInstrumentationError,
CDKCustomError,
DataValidationError,
FilteredEventsError,
];

const pathsDenyList = [
Expand Down
23 changes: 19 additions & 4 deletions src/v0/destinations/braze/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ const {
simpleProcessRouterDestSync,
simpleProcessRouterDest,
} = require('../../util');
const { InstrumentationError, NetworkError } = require('../../util/errorTypes');
const {
InstrumentationError,
NetworkError,
FilteredEventsError,
} = require('../../util/errorTypes');
const {
ConfigCategory,
mappingConfig,
Expand All @@ -39,7 +43,7 @@ const logger = require('../../../logger');
const { getEndpointFromConfig } = require('./util');
const { handleHttpRequest } = require('../../../adapters/network');
const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, FEATURE_FILTER_CODE } = require('../../util/constant');

function formatGender(gender) {
// few possible cases of woman
Expand Down Expand Up @@ -223,7 +227,13 @@ async function processIdentify(message, destination) {
}
}

function processTrackWithUserAttributes(message, destination, mappingJson, processParams) {
function processTrackWithUserAttributes(
message,
destination,
mappingJson,
processParams,
reqMetadata,
) {
let payload = getUserAttributesObject(message, mappingJson);
if (payload && Object.keys(payload).length > 0) {
payload = setExternalIdOrAliasObject(payload, message);
Expand All @@ -236,6 +246,10 @@ function processTrackWithUserAttributes(message, destination, mappingJson, proce
);
if (dedupedAttributePayload) {
requestJson.attributes = [dedupedAttributePayload];
} else if (reqMetadata?.features && reqMetadata.features[FEATURE_FILTER_CODE]) {
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
throw new FilteredEventsError(
'[Braze Deduplication]: Duplicate user detected, the user is dropped',
);
} else {
throw new InstrumentationError(
'[Braze Deduplication]: Duplicate user detected, the user is dropped',
Expand Down Expand Up @@ -444,7 +458,7 @@ function processAlias(message, destination) {
);
}

async function process(event, processParams = { userStore: new Map() }) {
async function process(event, processParams = { userStore: new Map() }, reqMetadata = {}) {
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
let response;
const { message, destination } = event;
const messageType = message.type.toLowerCase();
Expand Down Expand Up @@ -490,6 +504,7 @@ async function process(event, processParams = { userStore: new Map() }) {
destination,
mappingConfig[category.name],
processParams,
reqMetadata,
);
break;
case EventType.GROUP:
Expand Down
9 changes: 8 additions & 1 deletion src/v0/destinations/braze/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const {
ALIAS_BRAZE_MAX_REQ_COUNT,
TRACK_BRAZE_MAX_REQ_COUNT,
} = require('./config');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, HTTP_STATUS_CODES } = require('../../util/constant');
const { isObject } = require('../../util');
const { removeUndefinedValues, getIntegrationsObj } = require('../../util');
const { InstrumentationError } = require('../../util/errorTypes');
Expand Down Expand Up @@ -363,11 +363,14 @@ const processBatch = (transformedEvents) => {
const purchaseArray = [];
const successMetadata = [];
const failureResponses = [];
const filteredResponses = [];
const subscriptionsArray = [];
const mergeUsersArray = [];
for (const transformedEvent of transformedEvents) {
if (!isHttpStatusSuccess(transformedEvent?.statusCode)) {
failureResponses.push(transformedEvent);
} else if (transformedEvent?.statusCode === HTTP_STATUS_CODES.FILTER_EVENTS) {
filteredResponses.push(transformedEvent);
} else if (transformedEvent?.batchedRequest?.body?.JSON) {
const { attributes, events, purchases, subscription_groups, merge_updates } =
transformedEvent.batchedRequest.body.JSON;
Expand Down Expand Up @@ -446,6 +449,10 @@ const processBatch = (transformedEvents) => {
finalResponse.push(...failureResponses);
}

if (filteredResponses.length > 0) {
finalResponse.push(...filteredResponses);
}

return finalResponse;
};

Expand Down
44 changes: 35 additions & 9 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@
handleRtTfSingleEventError,
flattenJson,
} = require('../../util');

const { ConfigurationError, InstrumentationError } = require('../../util/errorTypes');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, HTTP_STATUS_CODES, FEATURE_FILTER_CODE } = require('../../util/constant');

/**
* Main Identify request handler func
Expand Down Expand Up @@ -105,7 +104,11 @@
},
};

const profileId = await getIdFromNewOrExistingProfile(endpoint, payload, requestOptions);
const { profileId, response } = await getIdFromNewOrExistingProfile(
endpoint,
payload,
requestOptions,
);

// Update Profile
const responseArray = [profileUpdateResponseBuilder(payload, profileId, category, privateApiKey)];
Expand All @@ -115,7 +118,8 @@
responseArray.push(subscribeUserToList(message, traitsInfo, destination));
return responseArray;
}
return responseArray[0];

return { ...responseArray[0], error: JSON.stringify(response) };
};

// ----------------------
Expand Down Expand Up @@ -326,13 +330,35 @@
}
}),
);
let batchedSubscribeResponseList = [];
const batchedSubscribeResponseList = [];
if (subscribeRespList.length > 0) {
batchedSubscribeResponseList = batchSubscribeEvents(subscribeRespList);
const batchedResponseList = batchSubscribeEvents(subscribeRespList, reqMetadata);
batchedSubscribeResponseList.push(...batchedResponseList);
}
const nonSubscribeSuccessList = nonSubscribeRespList.map((resp) =>
getSuccessRespEvents(resp.message, [resp.metadata], resp.destination),
);
const nonSubscribeSuccessList = nonSubscribeRespList.map((resp) => {
const response = resp;
if (
reqMetadata?.features &&
reqMetadata.features[FEATURE_FILTER_CODE] &&
response.message?.error
) {
return getSuccessRespEvents(
response.message,
[response.metadata],
response.destination,
false,
HTTP_STATUS_CODES.SUPPRESS_EVENTS,
);
}

if (response.message?.error) {
delete response.message?.error;
return getSuccessRespEvents(response.message, [response.metadata], response.destination);

Check warning on line 356 in src/v0/destinations/klaviyo/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/klaviyo/transform.js#L355-L356

Added lines #L355 - L356 were not covered by tests
}

return getSuccessRespEvents(response.message, [response.metadata], response.destination);

Check warning on line 359 in src/v0/destinations/klaviyo/transform.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/klaviyo/transform.js#L359

Added line #L359 was not covered by tests
});

batchResponseList = [...batchedSubscribeResponseList, ...nonSubscribeSuccessList];

return [...batchResponseList, ...batchErrorRespList];
Expand Down
51 changes: 36 additions & 15 deletions src/v0/destinations/klaviyo/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@
getSuccessRespEvents,
defaultPatchRequestConfig,
} = require('../../util');

const { BASE_ENDPOINT, MAPPING_CONFIG, CONFIG_CATEGORIES, MAX_BATCH_SIZE } = require('./config');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { NetworkError, InstrumentationError } = require('../../util/errorTypes');
const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils');
const tags = require('../../util/tags');
const { handleHttpRequest } = require('../../../adapters/network');
const { JSON_MIME_TYPE, FEATURE_FILTER_CODE } = require('../../util/constant');
const { NetworkError, InstrumentationError } = require('../../util/errorTypes');
const { getDynamicErrorType } = require('../../../adapters/utils/networkUtils');
const { client: errNotificationClient } = require('../../../util/errorNotifier');
const { BASE_ENDPOINT, MAPPING_CONFIG, CONFIG_CATEGORIES, MAX_BATCH_SIZE } = require('./config');

const REVISION_CONSTANT = '2023-02-22';

Expand All @@ -35,6 +34,7 @@
* @returns
*/
const getIdFromNewOrExistingProfile = async (endpoint, payload, requestOptions) => {
let response;
let profileId;
const endpointPath = '/api/profiles';
const { processedResponse: resp } = await handleHttpRequest(
Expand All @@ -48,15 +48,19 @@
endpointPath,
},
);

if (resp.status === 201) {
profileId = resp.response?.data?.id;
const { data } = resp.response;
response = { id: data.id, attributes: data.attributes };
} else if (resp.status === 409) {
const { errors } = resp.response;
profileId = errors?.[0]?.meta?.duplicate_profile_id;
response = errors;
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
}

if (profileId) {
return profileId;
return { profileId, response };
}

let statusCode = resp.status;
Expand All @@ -80,6 +84,14 @@
);
};

/**
* Update profile response builder
* @param {*} payload
* @param {*} profileId
* @param {*} category
* @param {*} privateApiKey
* @returns
*/
const profileUpdateResponseBuilder = (payload, profileId, category, privateApiKey) => {
const updatedPayload = payload;
const identifyResponse = defaultRequestConfig();
Expand Down Expand Up @@ -238,15 +250,15 @@
* @param {*} subscribeResponseList
* @returns
*/
const groupSubsribeResponsesUsingListId = (subscribeResponseList) => {
const groupSubscribeResponsesUsingListId = (subscribeResponseList) => {
const subscribeEventGroups = lodash.groupBy(
subscribeResponseList,
(event) => event.message.body.JSON.data.attributes.list_id,
);
return subscribeEventGroups;
};

const getBatchedResponseList = (subscribeEventGroups, identifyResponseList) => {
const getBatchedResponseList = (subscribeEventGroups, identifyResponseList, reqMetadata) => {
let batchedResponseList = [];
Object.keys(subscribeEventGroups).forEach((listId) => {
// eventChunks = [[e1,e2,e3,..batchSize],[e1,e2,e3,..batchSize]..]
Expand All @@ -262,16 +274,21 @@
});
batchedResponseList = [...batchedResponseList, ...batchedResponse];
});
identifyResponseList.forEach((response) => {
batchedResponseList[0].batchedRequest.push(response);
});

if (!(reqMetadata?.features && reqMetadata.features[FEATURE_FILTER_CODE])) {
identifyResponseList.forEach((response) => {
batchedResponseList[0].batchedRequest.push(response);

Check warning on line 280 in src/v0/destinations/klaviyo/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/klaviyo/util.js#L279-L280

Added lines #L279 - L280 were not covered by tests
});
}

return batchedResponseList;
};

const batchSubscribeEvents = (subscribeRespList) => {
const batchSubscribeEvents = (subscribeRespList, reqMetadata) => {
const identifyResponseList = [];
subscribeRespList.forEach((event) => {
const processedEvent = event;
// for group and identify events (it will contain only subscribe response)
if (processedEvent.message.length === 2) {
// the array will contain one update profile reponse and one subscribe reponse
identifyResponseList.push(event.message[0]);
Expand All @@ -282,9 +299,13 @@
}
});

const subscribeEventGroups = groupSubsribeResponsesUsingListId(subscribeRespList);
const subscribeEventGroups = groupSubscribeResponsesUsingListId(subscribeRespList);

const batchedResponseList = getBatchedResponseList(subscribeEventGroups, identifyResponseList);
const batchedResponseList = getBatchedResponseList(
subscribeEventGroups,
identifyResponseList,
reqMetadata,
);

return batchedResponseList;
};
Expand All @@ -295,6 +316,6 @@
populateCustomFieldsFromTraits,
generateBatchedPaylaodForArray,
batchSubscribeEvents,
getIdFromNewOrExistingProfile,
profileUpdateResponseBuilder,
getIdFromNewOrExistingProfile,
};
Loading
Loading