Skip to content

Commit

Permalink
chore: move checking of router transform structure to top-level
Browse files Browse the repository at this point in the history
Signed-off-by: Sai Sankeerth <[email protected]>
  • Loading branch information
Sai Sankeerth committed Feb 23, 2024
1 parent c592db2 commit ea12873
Show file tree
Hide file tree
Showing 29 changed files with 345 additions and 382 deletions.
15 changes: 15 additions & 0 deletions src/controllers/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import logger from '../logger';
import { getIntegrationVersion } from '../util/utils';
import tags from '../v0/util/tags';
import { DynamicConfigParser } from '../util/dynamicConfigParser';
import { checkInvalidRtTfEvents } from '../v0/util';

export class DestinationController {
public static async destinationTransformAtProcessor(ctx: Context) {
Expand Down Expand Up @@ -101,6 +102,20 @@ export class DestinationController {
const routerRequest = ctx.request.body as RouterTransformationRequest;
const destination = routerRequest.destType;
let events = routerRequest.input;
const errorRespEvents = checkInvalidRtTfEvents(events);
if (errorRespEvents.length > 0) {
errorRespEvents[0].metadata = [
{
destType: destination,
},
];
logger.debug(
`[${destination}] Invalid router transform payload structure: ${JSON.stringify(events)}`,
);
ctx.body = { output: errorRespEvents };
ControllerUtility.postProcess(ctx);
return ctx;
}
const metaTags = MiscService.getMetaTags(events[0].metadata);
stats.histogram('dest_transform_input_events', events.length, {
destination,
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const { EventType } = require('../../../constants');
const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
groupEventsByType,
} = require('../../util');
Expand Down Expand Up @@ -130,10 +129,6 @@ const processEachTypedEventList = (
};

const processRouterDest = (inputs) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs, DESTINATION);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
const finalResp = [];

const batchedEvents = groupEventsByType(inputs);
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/campaign_manager/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const {
removeUndefinedAndNullValues,
getSuccessRespEvents,
isDefinedAndNotNull,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');
Expand Down Expand Up @@ -245,11 +244,6 @@ const batchEvents = (eventChunksArray) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchErrorRespList = [];
const eventChunksArray = [];
const { destination } = inputs[0];
Expand Down
8 changes: 0 additions & 8 deletions src/v0/destinations/clevertap/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const {
handleRtTfSingleEventError,
batchMultiplexedEvents,
getSuccessRespEvents,
checkInvalidRtTfEvents,
} = require('../../util');
const { generateClevertapBatchedPayload } = require('./utils');

Expand Down Expand Up @@ -389,13 +388,6 @@ const processEvent = (message, destination) => {
const process = (event) => processEvent(event.message, event.destination);

const processRouterDest = (inputs, reqMetadata) => {
// const respList = await simpleProcessRouterDest(inputs, process, reqMetadata);
// return respList;
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = [];
const errorRespList = [];
// const { destination } = inputs[0];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/customerio/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const {
getFieldValueFromMessage,
handleRtTfSingleEventError,
validateEventName,
checkInvalidRtTfEvents,
} = require('../../util');

const logger = require('../../../logger');
Expand Down Expand Up @@ -174,10 +173,6 @@ const batchEvents = (successRespList) => {
};

const processRouterDest = (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ const {
handleRtTfSingleEventError,
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
Expand Down Expand Up @@ -186,11 +185,6 @@ const batchEvents = (storeSalesEvents) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const storeSalesEvents = []; // list containing store sales events in batched format
const clickCallEvents = []; // list containing click and call events in batched format
const errorRespList = [];
Expand Down
11 changes: 1 addition & 10 deletions src/v0/destinations/google_cloud_function/transform.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
const lodash = require('lodash');
const {
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const { getSuccessRespEvents, handleRtTfSingleEventError } = require('../../util');

const { generateBatchedPayload, validateDestinationConfig } = require('./util');

Expand Down Expand Up @@ -40,11 +36,6 @@ function batchEvents(successRespList, maxBatchSize = 10) {

// Router transform with batching by default
const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const successResponseList = [];
const errorRespList = [];
const { destination } = inputs[0];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/googlesheets/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const {
getValueFromMessage,
getSuccessRespEvents,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
} = require('../../util');

const SOURCE_KEYS = ['properties', 'traits', 'context.traits'];
Expand Down Expand Up @@ -111,10 +110,6 @@ const process = (event) => {
const processRouterDest = async (inputs, reqMetadata) => {
const successRespList = [];
const errorRespList = [];
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
await Promise.all(
inputs.map(async (input) => {
try {
Expand Down
10 changes: 1 addition & 9 deletions src/v0/destinations/hs/transform.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
const get = require('get-value');
const { InstrumentationError } = require('@rudderstack/integrations-lib');
const { EventType } = require('../../../constants');
const {
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getDestinationExternalIDInfoForRetl,
} = require('../../util');
const { handleRtTfSingleEventError, getDestinationExternalIDInfoForRetl } = require('../../util');
const { API_VERSION } = require('./config');
const {
processLegacyIdentify,
Expand Down Expand Up @@ -71,10 +67,6 @@ const process = async (event) => {
// we are batching by default at routerTransform
const processRouterDest = async (inputs, reqMetadata) => {
let tempInputs = inputs;
const errorRespEvents = checkInvalidRtTfEvents(tempInputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const successRespList = [];
const errorRespList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/iterable/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const {
const {
constructPayload,
defaultRequestConfig,
checkInvalidRtTfEvents,
defaultPostRequestConfig,
handleRtTfSingleEventError,
removeUndefinedAndNullValues,
Expand Down Expand Up @@ -162,11 +161,6 @@ const process = (event) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const batchedEvents = batchEvents(inputs);
const response = await Promise.all(
batchedEvents.map(async (listOfEvents) => {
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/kafka/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const {
getHashFromArray,
removeUndefinedAndNullValues,
getSuccessRespEvents,
checkInvalidRtTfEvents,
} = require('../../util');

const filterConfigTopics = (message, destination) => {
Expand Down Expand Up @@ -38,10 +37,6 @@ const filterConfigTopics = (message, destination) => {

const batch = (destEvents) => {
const respList = [];
const errorRespEvents = checkInvalidRtTfEvents(destEvents);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

// Grouping the events by topic
const groupedEvents = groupBy(destEvents, (event) => event.message.topic);
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/klaviyo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ const {
addExternalIdToTraits,
adduserIdFromExternalId,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
flattenJson,
isNewStatusCodesAccepted,
Expand Down Expand Up @@ -320,10 +319,6 @@ const getEventChunks = (event, subscribeRespList, nonSubscribeRespList) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const subscribeRespList = [];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/mailchimp/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const { InstrumentationError, ConfigurationError } = require('@rudderstack/integ
const {
defaultPutRequestConfig,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
constructPayload,
defaultPostRequestConfig,
isDefinedAndNotNull,
Expand Down Expand Up @@ -162,10 +161,6 @@ const getEventChunks = (event, identifyRespList, trackRespList) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const identifyRespList = [];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/mailjet/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const {
defaultBatchRequestConfig,
handleRtTfSingleEventError,
removeUndefinedAndNullValues,
checkInvalidRtTfEvents,
} = require('../../util');

const { MAX_BATCH_SIZE } = require('./config');
Expand Down Expand Up @@ -121,10 +120,6 @@ const batchEvents = (successRespList) => {
};

const processRouterDest = (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let batchResponseList = [];
const batchErrorRespList = [];
const successRespList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/mailmodo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const {
removeUndefinedAndNullValues,
getSuccessRespEvents,
handleRtTfSingleEventError,
checkInvalidRtTfEvents,
} = require('../../util');
const { deduceAddressFields, extractCustomProperties } = require('./utils');
const { JSON_MIME_TYPE } = require('../../util/constant');
Expand Down Expand Up @@ -191,11 +190,6 @@ function getEventChunks(event, identifyEventChunks, eventResponseList) {
}

const processRouterDest = (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const identifyEventChunks = []; // list containing identify events in batched format
const eventResponseList = []; // list containing other events in batched format
const errorRespList = [];
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/marketo/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ const {
getSuccessRespEvents,
isDefinedAndNotNull,
generateErrorObject,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util');
const Cache = require('../../util/cache');
Expand Down Expand Up @@ -456,10 +455,6 @@ const process = async (event) => {
const processRouterDest = async (inputs, reqMetadata) => {
// Token needs to be generated for marketo which will be done on input level.
// If destination information is not present Error should be thrown
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
let token;
try {
token = await getAuthToken(formatConfig(inputs[0].destination));
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/mp/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const {
removeUndefinedValues,
toUnixTimestampInMS,
getFieldValueFromMessage,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
groupEventsByType,
parseConfigArray,
Expand Down Expand Up @@ -460,11 +459,6 @@ const process = (event) => processSingleMessage(event.message, event.destination
// Ref: https://help.mixpanel.com/hc/en-us/articles/115004613766-Default-Properties-Collected-by-Mixpanel
// Ref: https://help.mixpanel.com/hc/en-us/articles/115004561786-Track-UTM-Tags
const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const groupedEvents = groupEventsByType(inputs);
const response = await Promise.all(
groupedEvents.map(async (listOfEvents) => {
Expand Down
5 changes: 0 additions & 5 deletions src/v0/destinations/ometria/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ const {
getFieldValueFromMessage,
getIntegrationsObj,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
} = require('../../util/index');
const {
Expand Down Expand Up @@ -250,10 +249,6 @@ const process = (event) => {
};

const processRouterDest = async (inputs, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(inputs);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}
const inputChunks = returnArrayOfSubarrays(inputs, MAX_BATCH_SIZE);
const successList = [];
const errorList = [];
Expand Down
6 changes: 0 additions & 6 deletions src/v0/destinations/pardot/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const {
getFieldValueFromMessage,
removeUndefinedValues,
getSuccessRespEvents,
checkInvalidRtTfEvents,
handleRtTfSingleEventError,
getAccessToken,
} = require('../../util');
Expand Down Expand Up @@ -150,11 +149,6 @@ const processEvent = (metadata, message, destination) => {
const process = (event) => processEvent(event.metadata, event.message, event.destination);

const processRouterDest = (events, reqMetadata) => {
const errorRespEvents = checkInvalidRtTfEvents(events);
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const responseList = events.map((event) => {
try {
return getSuccessRespEvents(process(event), [event.metadata], event.destination);
Expand Down
Loading

0 comments on commit ea12873

Please sign in to comment.