Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hs): chunking data based on batch limit #2907

Merged
merged 29 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1adef48
feat(hs): chunking data based on batch limit
mihir-4116 Dec 13, 2023
a847359
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 13, 2023
b45e648
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 15, 2023
2ac79e9
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 20, 2023
52dcfdb
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 27, 2023
ac69fce
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 2, 2024
7be9c84
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 8, 2024
1b5ef0f
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 8, 2024
df1734b
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 11, 2024
181d0b1
fix: code review changes
mihir-4116 Jan 17, 2024
00fdf9a
chore: code review changes
mihir-4116 Jan 24, 2024
f448299
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 24, 2024
c56fa16
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 25, 2024
7ef304e
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 31, 2024
8e43528
chore: code review changes
mihir-4116 Feb 2, 2024
693fe6d
chore: code review changes
mihir-4116 Feb 2, 2024
02d4f80
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 2, 2024
de350d0
chore: code review changes
mihir-4116 Feb 2, 2024
be93d67
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 2, 2024
cd00df0
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 2, 2024
8070f09
chore: code review changes
mihir-4116 Feb 5, 2024
6c64210
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 7, 2024
e2ba302
fix(hs): test case response
mihir-4116 Feb 7, 2024
509cb32
Merge branch 'feat.hs-batching' of github.com:rudderlabs/rudder-trans…
mihir-4116 Feb 7, 2024
0833fb5
chore: code refactor
mihir-4116 Feb 7, 2024
17b8f38
refactor getExistingContactsData
koladilip Feb 7, 2024
ae54c39
chore: code refactor
mihir-4116 Feb 7, 2024
7867a7e
refactor getExistingContactsData
koladilip Feb 7, 2024
90f3784
chore: code refactor
mihir-4116 Feb 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v0/destinations/hs/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const API_VERSION = {
v3: 'newApi',
};

const MAX_CONTACTS_PER_REQUEST = 100;

const ConfigCategory = {
COMMON: {
name: 'HSCommonConfig',
Expand Down Expand Up @@ -109,5 +111,6 @@ module.exports = {
SEARCH_LIMIT_VALUE,
RETL_SOURCE,
RETL_CREATE_ASSOCIATION_OPERATION,
MAX_CONTACTS_PER_REQUEST,
DESTINATION: 'HS',
};
223 changes: 150 additions & 73 deletions src/v0/destinations/hs/util.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/* eslint-disable no-await-in-loop */
const lodash = require('lodash');
const get = require('get-value');
const {
NetworkInstrumentationError,
Expand Down Expand Up @@ -25,6 +27,7 @@
SEARCH_LIMIT_VALUE,
hsCommonConfigJson,
DESTINATION,
MAX_CONTACTS_PER_REQUEST,
} = require('./config');

const tags = require('../../util/tags');
Expand Down Expand Up @@ -464,42 +467,128 @@
};

/**
* DOC: https://developers.hubspot.com/docs/api/crm/search
* Validates object and identifier type is present in message
* @param {*} firstMessage
* @returns
*/
const getObjectAndIdentifierType = (firstMessage) => {
const { objectType, identifierType } = getDestinationExternalIDInfoForRetl(
firstMessage,
DESTINATION,
);
if (!objectType || !identifierType) {
throw new InstrumentationError('rETL - external Id not found.');
}
return { objectType, identifierType };
};

/**
* Returns values for search api call
* @param {*} inputs
* @returns
*/
const extractIDsForSearchAPI = (inputs) => {
const values = inputs.map((input) => {
const { message } = input;
const { destinationExternalId } = getDestinationExternalIDInfoForRetl(message, DESTINATION);
return destinationExternalId.toString().toLowerCase();
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
});

return Array.from(new Set(values));
};

/**
* Returns hubspot records
* Ref : https://developers.hubspot.com/docs/api/crm/search
* @param {*} data
* @param {*} requestOptions
* @param {*} objectType
* @param {*} identifierType
* @param {*} destination
* @returns
*/
const getExistingData = async (inputs, destination) => {
const performHubSpotSearch = async (
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
reqdata,
reqOptions,
objectType,
identifierType,
destination,
) => {
let checkAfter = 1;
const searchResults = [];
const requestData = reqdata;
const { Config } = destination;
let values = [];
let searchResponse;
let updateHubspotIds = [];
const firstMessage = inputs[0].message;
let objectType = null;
let identifierType = null;

if (firstMessage) {
objectType = getDestinationExternalIDInfoForRetl(firstMessage, DESTINATION).objectType;
identifierType = getDestinationExternalIDInfoForRetl(firstMessage, DESTINATION).identifierType;
if (!objectType || !identifierType) {
throw new InstrumentationError('rETL - external Id not found.');

const endpoint = IDENTIFY_CRM_SEARCH_ALL_OBJECTS.replace(':objectType', objectType);
const endpointPath = `objects/:objectType/search`;

const url =
Config.authorizationType === 'newPrivateAppApi'
? endpoint
: `${endpoint}?hapikey=${Config.apiKey}`;

const requestOptions = Config.authorizationType === 'newPrivateAppApi' ? reqOptions : {};

/* *
* This is needed for processing paginated response when searching hubspot.
* we can't avoid await in loop as response to the request contains the pagination details
* */

while (checkAfter) {
const searchResponse = await httpPOST(url, requestData, requestOptions, {
destType: 'hs',
feature: 'transformation',
endpointPath,
});

const processedResponse = processAxiosResponse(searchResponse);

if (processedResponse.status !== 200) {
throw new NetworkError(
`rETL - Error during searching object record. ${JSON.stringify(
processedResponse.response?.message,
)}`,
processedResponse.status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(processedResponse.status),
},
processedResponse,
);
}

const after = processedResponse.response?.paging?.next?.after || 0;
requestData.after = after; // assigning to the new value of after
checkAfter = after; // assigning to the new value if no after we assign it to 0 and no more calls will take place

const results = processedResponse.response?.results;
if (results) {
searchResults.push(
...results.map((result) => ({
id: result.id,
property: result.properties[identifierType],
})),
);
}
} else {
throw new InstrumentationError('rETL - objectType or identifier type not found. ');
}
inputs.map(async (input) => {
const { message } = input;
const { destinationExternalId } = getDestinationExternalIDInfoForRetl(message, DESTINATION);
values.push(destinationExternalId.toString().toLowerCase());
});

values = Array.from(new Set(values));
return searchResults;
};

/**
* Returns requestData and requestOptions
* @param {*} identifierType
* @param {*} chunk
* @param {*} accessToken
* @returns
*/
const getRequestDataAndRequestOptions = (identifierType, chunk, accessToken) => {
const requestData = {
filterGroups: [
{
filters: [
{
propertyName: identifierType,
values,
values: chunk,
operator: 'IN',
},
],
Expand All @@ -513,62 +602,47 @@
const requestOptions = {
headers: {
'Content-Type': JSON_MIME_TYPE,
Authorization: `Bearer ${Config.accessToken}`,
Authorization: `Bearer ${accessToken}`,
},
};
let checkAfter = 1; // variable to keep checking if we have more results

/* eslint-disable no-await-in-loop */
return { requestData, requestOptions };
};

/* *
* This is needed for processing paginated response when searching hubspot.
* we can't avoid await in loop as response to the request contains the pagination details
* */
/**
* DOC: https://developers.hubspot.com/docs/api/crm/search
* @param {*} inputs
* @param {*} destination
*/
const getExistingContactsData = async (inputs, destination) => {
const { Config } = destination;
const updateHubspotIds = [];
const firstMessage = inputs[0].message;

while (checkAfter) {
const endpoint = IDENTIFY_CRM_SEARCH_ALL_OBJECTS.replace(':objectType', objectType);
const endpointPath = `objects/:objectType/search`;

const url =
Config.authorizationType === 'newPrivateAppApi'
? endpoint
: `${endpoint}?hapikey=${Config.apiKey}`;
searchResponse =
Config.authorizationType === 'newPrivateAppApi'
? await httpPOST(url, requestData, requestOptions, {
destType: 'hs',
feature: 'transformation',
endpointPath,
})
: await httpPOST(url, requestData, {
destType: 'hs',
feature: 'transformation',
endpointPath,
});
searchResponse = processAxiosResponse(searchResponse);

if (searchResponse.status !== 200) {
throw new NetworkError(
`rETL - Error during searching object record. ${searchResponse.response?.message}`,
searchResponse.status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(searchResponse.status),
},
searchResponse,
);
}
if (!firstMessage) {
throw new InstrumentationError('rETL - objectType or identifier type not found.');

Check warning on line 622 in src/v0/destinations/hs/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/hs/util.js#L622

Added line #L622 was not covered by tests
}

const after = searchResponse.response?.paging?.next?.after || 0;
const { objectType, identifierType } = getObjectAndIdentifierType(firstMessage);

requestData.after = after; // assigning to the new value of after
checkAfter = after; // assigning to the new value if no after we assign it to 0 and no more calls will take place
const values = extractIDsForSearchAPI(inputs);
const valuesChunk = lodash.chunk(values, MAX_CONTACTS_PER_REQUEST);

const results = searchResponse.response?.results;
if (results) {
updateHubspotIds = results.map((result) => {
const propertyValue = result.properties[identifierType];
return { id: result.id, property: propertyValue };
});
// eslint-disable-next-line no-restricted-syntax
for (const chunk of valuesChunk) {
const { requestData, requestOptions } = getRequestDataAndRequestOptions(
identifierType,
chunk,
Config.accessToken,
);
const searchResults = await performHubSpotSearch(
requestData,
requestOptions,
objectType,
identifierType,
destination,
);
if (searchResults.length > 0) {
updateHubspotIds.push(...searchResults);
}
}
return updateHubspotIds;
Expand Down Expand Up @@ -601,7 +675,7 @@

const splitEventsForCreateUpdate = async (inputs, destination) => {
// get all the id and properties of already existing objects needed for update.
const updateHubspotIds = await getExistingData(inputs, destination);
const updateHubspotIds = await getExistingContactsData(inputs, destination);

const resultInput = inputs.map((input) => {
const { message } = input;
Expand Down Expand Up @@ -680,4 +754,7 @@
validatePayloadDataTypes,
getUTCMidnightTimeStampValue,
populateTraits,
getObjectAndIdentifierType,
extractIDsForSearchAPI,
getRequestDataAndRequestOptions,
};
Loading
Loading