Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integrations): introduced new status codes to suppress or filter events #2611

Merged
merged 22 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2d2cb1c
feat(integrations): introduced new actions to suppress or filter events
mihir-4116 Sep 14, 2023
90f9e5e
feat(integrations): introduced new actions to suppress or filter events
mihir-4116 Sep 14, 2023
855fdce
chore: klaviyo multiple response action fix
mihir-4116 Sep 14, 2023
bb5fe08
Merge branch 'develop' into feat.new_actions
mihir-4116 Sep 15, 2023
cf0d5f0
Merge branch 'develop' into feat.new_actions
mihir-4116 Sep 15, 2023
cec8b8b
chore: statuscode changes
mihir-4116 Sep 25, 2023
14aa4e0
chore: introduce new statuscode for filter and suppress events
mihir-4116 Sep 25, 2023
0cf67fe
chore: pr conflicts resolved
mihir-4116 Sep 26, 2023
4624836
chore: code review changes
mihir-4116 Sep 26, 2023
8d9a1e7
chore: pr conflicts resolved
mihir-4116 Oct 3, 2023
99e0ad7
Merge branch 'develop' of github.com:rudderlabs/rudder-transformer in…
mihir-4116 Oct 4, 2023
726be1e
chore: added support of server<>transformer cross compatibility
mihir-4116 Oct 4, 2023
83d4c79
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 4, 2023
45dfd16
chore: code review changes
mihir-4116 Oct 5, 2023
b40add8
chore: added new status code tests for braze
mihir-4116 Oct 5, 2023
4cfc660
chore: added new tests for both transformer versions - with status co…
mihir-4116 Oct 5, 2023
97cba05
chore: code review changes
mihir-4116 Oct 6, 2023
f1734c2
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 6, 2023
cab085e
chore: code review changes
mihir-4116 Oct 6, 2023
fb1a810
chore: code review changes
mihir-4116 Oct 6, 2023
6b93613
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 6, 2023
0339c44
Merge branch 'develop' into feat.new_actions
mihir-4116 Oct 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/routes/destination.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Router from '@koa/router';
import DestinationController from '../controllers/destination';
import RegulationController from '../controllers/regulation';
import FeatureFlagController from '../middlewares/featureFlag';
import RouteActivationController from '../middlewares/routeActivation';

const router = new Router();
Expand All @@ -9,22 +10,25 @@ router.post(
'/:version/destinations/:destination',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationProcFilter,
FeatureFlagController.handle,
DestinationController.destinationTransformAtProcessor,
);
router.post(
'/routerTransform',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationRtFilter,
FeatureFlagController.handle,
DestinationController.destinationTransformAtRouter,
);
router.post(
'/batch',
RouteActivationController.isDestinationRouteActive,
RouteActivationController.destinationBatchFilter,
FeatureFlagController.handle,
DestinationController.batchProcess,
);

router.post('/deleteUsers', RegulationController.deleteUsers);

const destinationRoutes = router.routes();
export default destinationRoutes;
export default destinationRoutes;
15 changes: 9 additions & 6 deletions src/services/destination/nativeIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ export default class NativeIntegrationDestinationService implements IntegrationD
events: ProcessorTransformationRequest[],
destinationType: string,
version: string,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): Promise<ProcessorTransformationResponse[]> {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
const respList: ProcessorTransformationResponse[][] = await Promise.all(
events.map(async (event) => {
try {
const transformedPayloads:
| ProcessorTransformationOutput
| ProcessorTransformationOutput[] = await destHandler.process(event);
| ProcessorTransformationOutput[] = await destHandler.process(event, requestMetadata);
return DestinationPostTransformationService.handleProcessorTransformSucessEvents(
event,
transformedPayloads,
Expand Down Expand Up @@ -88,7 +88,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
events: RouterTransformationRequestData[],
destinationType: string,
version: string,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): Promise<RouterTransformationResponse[]> {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
const allDestEvents: NonNullable<unknown> = groupBy(
Expand All @@ -106,7 +106,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
);
try {
const doRouterTransformationResponse: RouterTransformationResponse[] =
await destHandler.processRouterDest(cloneDeep(destInputArray));
await destHandler.processRouterDest(cloneDeep(destInputArray), requestMetadata);
metaTO.metadata = destInputArray[0].metadata;
return DestinationPostTransformationService.handleRouterTransformSuccessEvents(
doRouterTransformationResponse,
Expand All @@ -132,7 +132,7 @@ export default class NativeIntegrationDestinationService implements IntegrationD
events: RouterTransformationRequestData[],
destinationType: string,
version: any,
_requestMetadata: NonNullable<unknown>,
requestMetadata: NonNullable<unknown>,
): RouterTransformationResponse[] {
const destHandler = FetchHandler.getDestHandler(destinationType, version);
if (!destHandler.batch) {
Expand All @@ -145,7 +145,10 @@ export default class NativeIntegrationDestinationService implements IntegrationD
const groupedEvents: RouterTransformationRequestData[][] = Object.values(allDestEvents);
const response = groupedEvents.map((destEvents) => {
try {
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(destEvents);
const destBatchedRequests: RouterTransformationResponse[] = destHandler.batch(
destEvents,
requestMetadata,
);
return destBatchedRequests;
} catch (error: any) {
const metaTO = this.getTags(
Expand Down
1 change: 1 addition & 0 deletions src/services/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export default class MiscService {
return {
namespace: 'Unknown',
cluster: 'Unknown',
features: ctx.state?.features || {},
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/util/errorNotifier/bugsnag.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const {
UnhandledStatusCodeError,
UnauthorizedError,
NetworkInstrumentationError,
FilteredEventsError,
} = require('../../v0/util/errorTypes');

const {
Expand All @@ -48,6 +49,7 @@ const errorTypesDenyList = [
NetworkInstrumentationError,
CDKCustomError,
DataValidationError,
FilteredEventsError,
];

const pathsDenyList = [
Expand Down
22 changes: 19 additions & 3 deletions src/v0/destinations/braze/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ const {
isHttpStatusSuccess,
simpleProcessRouterDestSync,
simpleProcessRouterDest,
isNewStatusCodesAccepted,
} = require('../../util');
const { InstrumentationError, NetworkError } = require('../../util/errorTypes');
const {
InstrumentationError,
NetworkError,
FilteredEventsError,
} = require('../../util/errorTypes');
const {
ConfigCategory,
mappingConfig,
Expand Down Expand Up @@ -223,7 +228,13 @@ async function processIdentify(message, destination) {
}
}

function processTrackWithUserAttributes(message, destination, mappingJson, processParams) {
function processTrackWithUserAttributes(
message,
destination,
mappingJson,
processParams,
reqMetadata,
) {
let payload = getUserAttributesObject(message, mappingJson);
if (payload && Object.keys(payload).length > 0) {
payload = setExternalIdOrAliasObject(payload, message);
Expand All @@ -236,6 +247,10 @@ function processTrackWithUserAttributes(message, destination, mappingJson, proce
);
if (dedupedAttributePayload) {
requestJson.attributes = [dedupedAttributePayload];
} else if (isNewStatusCodesAccepted(reqMetadata)) {
throw new FilteredEventsError(
'[Braze Deduplication]: Duplicate user detected, the user is dropped',
);
} else {
throw new InstrumentationError(
'[Braze Deduplication]: Duplicate user detected, the user is dropped',
Expand Down Expand Up @@ -444,7 +459,7 @@ function processAlias(message, destination) {
);
}

async function process(event, processParams = { userStore: new Map() }) {
async function process(event, processParams = { userStore: new Map() }, reqMetadata = {}) {
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
let response;
const { message, destination } = event;
const messageType = message.type.toLowerCase();
Expand Down Expand Up @@ -490,6 +505,7 @@ async function process(event, processParams = { userStore: new Map() }) {
destination,
mappingConfig[category.name],
processParams,
reqMetadata,
);
break;
case EventType.GROUP:
Expand Down
9 changes: 8 additions & 1 deletion src/v0/destinations/braze/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const {
ALIAS_BRAZE_MAX_REQ_COUNT,
TRACK_BRAZE_MAX_REQ_COUNT,
} = require('./config');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, HTTP_STATUS_CODES } = require('../../util/constant');
const { isObject } = require('../../util');
const { removeUndefinedValues, getIntegrationsObj } = require('../../util');
const { InstrumentationError } = require('../../util/errorTypes');
Expand Down Expand Up @@ -363,11 +363,14 @@ const processBatch = (transformedEvents) => {
const purchaseArray = [];
const successMetadata = [];
const failureResponses = [];
const filteredResponses = [];
const subscriptionsArray = [];
const mergeUsersArray = [];
for (const transformedEvent of transformedEvents) {
if (!isHttpStatusSuccess(transformedEvent?.statusCode)) {
failureResponses.push(transformedEvent);
} else if (transformedEvent?.statusCode === HTTP_STATUS_CODES.FILTER_EVENTS) {
filteredResponses.push(transformedEvent);
} else if (transformedEvent?.batchedRequest?.body?.JSON) {
const { attributes, events, purchases, subscription_groups, merge_updates } =
transformedEvent.batchedRequest.body.JSON;
Expand Down Expand Up @@ -446,6 +449,10 @@ const processBatch = (transformedEvents) => {
finalResponse.push(...failureResponses);
}

if (filteredResponses.length > 0) {
finalResponse.push(...filteredResponses);
}

return finalResponse;
};

Expand Down
80 changes: 61 additions & 19 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ const {
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
flattenJson,
isNewStatusCodesAccepted,
} = require('../../util');

const { ConfigurationError, InstrumentationError } = require('../../util/errorTypes');
const { JSON_MIME_TYPE } = require('../../util/constant');
const { JSON_MIME_TYPE, HTTP_STATUS_CODES } = require('../../util/constant');

/**
* Main Identify request handler func
Expand All @@ -49,9 +49,10 @@ const { JSON_MIME_TYPE } = require('../../util/constant');
* @param {*} message
* @param {*} category
* @param {*} destination
* @param {*} reqMetadata
* @returns
*/
const identifyRequestHandler = async (message, category, destination) => {
const identifyRequestHandler = async (message, category, destination, reqMetadata) => {
// If listId property is present try to subscribe/member user in list
const { privateApiKey, enforceEmailAsPrimary, listId, flattenProperties } = destination.Config;
const mappedToDestination = get(message, MappedToDestinationKey);
Expand Down Expand Up @@ -105,17 +106,40 @@ const identifyRequestHandler = async (message, category, destination) => {
},
};

const profileId = await getIdFromNewOrExistingProfile(endpoint, payload, requestOptions);
const { profileId, response, statusCode } = await getIdFromNewOrExistingProfile(
endpoint,
payload,
requestOptions,
);

// Update Profile
const responseArray = [profileUpdateResponseBuilder(payload, profileId, category, privateApiKey)];
const responseMap = {
profileUpdateResponse: profileUpdateResponseBuilder(
payload,
profileId,
category,
privateApiKey,
),
};

// check if user wants to subscribe profile or not and listId is present or not
if (traitsInfo?.properties?.subscribe && (traitsInfo.properties?.listId || listId)) {
responseArray.push(subscribeUserToList(message, traitsInfo, destination));
return responseArray;
responseMap.subscribeUserToListResponse = subscribeUserToList(message, traitsInfo, destination);
}

if (isNewStatusCodesAccepted(reqMetadata) && statusCode === HTTP_STATUS_CODES.CREATED) {
responseMap.suppressEventResponse = {
...responseMap.profileUpdateResponse,
statusCode: HTTP_STATUS_CODES.SUPPRESS_EVENTS,
error: JSON.stringify(response),
};
return responseMap.subscribeUserToListResponse
? [responseMap.subscribeUserToListResponse]
: responseMap.suppressEventResponse;
}
return responseArray[0];

return responseMap.subscribeUserToListResponse
? [responseMap.profileUpdateResponse, responseMap.subscribeUserToListResponse]
: responseMap.profileUpdateResponse;
};

// ----------------------
Expand Down Expand Up @@ -241,7 +265,7 @@ const groupRequestHandler = (message, category, destination) => {
};

// Main event processor using specific handler funcs
const processEvent = async (message, destination) => {
const processEvent = async (message, destination, reqMetadata) => {
if (!message.type) {
throw new InstrumentationError('Event type is required');
}
Expand All @@ -255,7 +279,7 @@ const processEvent = async (message, destination) => {
switch (messageType) {
case EventType.IDENTIFY:
category = CONFIG_CATEGORIES.IDENTIFY;
response = await identifyRequestHandler(message, category, destination);
response = await identifyRequestHandler(message, category, destination, reqMetadata);
break;
case EventType.SCREEN:
case EventType.TRACK:
Expand All @@ -272,8 +296,8 @@ const processEvent = async (message, destination) => {
return response;
};

const process = async (event) => {
const result = await processEvent(event.message, event.destination);
const process = async (event, reqMetadata) => {
const result = await processEvent(event.message, event.destination, reqMetadata);
return result;
};

Expand Down Expand Up @@ -312,7 +336,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
// if not transformed
getEventChunks(
{
message: await process(event),
message: await process(event, reqMetadata),
utsabc marked this conversation as resolved.
Show resolved Hide resolved
metadata: event.metadata,
destination,
},
Expand All @@ -326,13 +350,31 @@ const processRouterDest = async (inputs, reqMetadata) => {
}
}),
);
let batchedSubscribeResponseList = [];
const batchedSubscribeResponseList = [];
if (subscribeRespList.length > 0) {
batchedSubscribeResponseList = batchSubscribeEvents(subscribeRespList);
const batchedResponseList = batchSubscribeEvents(subscribeRespList);
batchedSubscribeResponseList.push(...batchedResponseList);
}
const nonSubscribeSuccessList = nonSubscribeRespList.map((resp) =>
getSuccessRespEvents(resp.message, [resp.metadata], resp.destination),
);
const nonSubscribeSuccessList = nonSubscribeRespList.map((resp) => {
const response = resp;
const { message, metadata, destination: eventDestination } = response;
if (
isNewStatusCodesAccepted(reqMetadata) &&
message?.statusCode &&
message.statusCode === HTTP_STATUS_CODES.SUPPRESS_EVENTS
) {
delete message.statusCode;
return getSuccessRespEvents(
message,
[metadata],
eventDestination,
false,
HTTP_STATUS_CODES.SUPPRESS_EVENTS,
);
}
return getSuccessRespEvents(message, [metadata], eventDestination);
});

batchResponseList = [...batchedSubscribeResponseList, ...nonSubscribeSuccessList];

return [...batchResponseList, ...batchErrorRespList];
Expand Down
Loading