Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hs): chunking data based on batch limit #2907

Merged
merged 29 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1adef48
feat(hs): chunking data based on batch limit
mihir-4116 Dec 13, 2023
a847359
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 13, 2023
b45e648
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 15, 2023
2ac79e9
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 20, 2023
52dcfdb
Merge branch 'develop' into feat.hs-batching
mihir-4116 Dec 27, 2023
ac69fce
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 2, 2024
7be9c84
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 8, 2024
1b5ef0f
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 8, 2024
df1734b
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 11, 2024
181d0b1
fix: code review changes
mihir-4116 Jan 17, 2024
00fdf9a
chore: code review changes
mihir-4116 Jan 24, 2024
f448299
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 24, 2024
c56fa16
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 25, 2024
7ef304e
Merge branch 'develop' into feat.hs-batching
mihir-4116 Jan 31, 2024
8e43528
chore: code review changes
mihir-4116 Feb 2, 2024
693fe6d
chore: code review changes
mihir-4116 Feb 2, 2024
02d4f80
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 2, 2024
de350d0
chore: code review changes
mihir-4116 Feb 2, 2024
be93d67
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 2, 2024
cd00df0
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 2, 2024
8070f09
chore: code review changes
mihir-4116 Feb 5, 2024
6c64210
Merge branch 'develop' into feat.hs-batching
mihir-4116 Feb 7, 2024
e2ba302
fix(hs): test case response
mihir-4116 Feb 7, 2024
509cb32
Merge branch 'feat.hs-batching' of github.com:rudderlabs/rudder-trans…
mihir-4116 Feb 7, 2024
0833fb5
chore: code refactor
mihir-4116 Feb 7, 2024
17b8f38
refactor getExistingContactsData
koladilip Feb 7, 2024
ae54c39
chore: code refactor
mihir-4116 Feb 7, 2024
7867a7e
refactor getExistingContactsData
koladilip Feb 7, 2024
90f3784
chore: code refactor
mihir-4116 Feb 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/v0/destinations/hs/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ const API_VERSION = {
v3: 'newApi',
};

const MAX_CONTACTS_PER_REQUEST = 100;

const ConfigCategory = {
COMMON: {
name: 'HSCommonConfig',
Expand Down Expand Up @@ -109,5 +111,6 @@ module.exports = {
SEARCH_LIMIT_VALUE,
RETL_SOURCE,
RETL_CREATE_ASSOCIATION_OPERATION,
MAX_CONTACTS_PER_REQUEST,
DESTINATION: 'HS',
};
219 changes: 138 additions & 81 deletions src/v0/destinations/hs/util.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
/* eslint-disable no-await-in-loop */
const lodash = require('lodash');
const get = require('get-value');
const {
NetworkInstrumentationError,
Expand Down Expand Up @@ -25,6 +27,7 @@
SEARCH_LIMIT_VALUE,
hsCommonConfigJson,
DESTINATION,
MAX_CONTACTS_PER_REQUEST,
} = require('./config');

const tags = require('../../util/tags');
Expand Down Expand Up @@ -464,111 +467,163 @@
};

/**
* DOC: https://developers.hubspot.com/docs/api/crm/search
* @param {*} inputs
* @param {*} destination
* Validates object and identifier type is present in message
* @param {*} firstMessage
* @returns
*/
const getExistingData = async (inputs, destination) => {
const { Config } = destination;
let values = [];
let searchResponse;
let updateHubspotIds = [];
const firstMessage = inputs[0].message;
let objectType = null;
let identifierType = null;

if (firstMessage) {
objectType = getDestinationExternalIDInfoForRetl(firstMessage, DESTINATION).objectType;
identifierType = getDestinationExternalIDInfoForRetl(firstMessage, DESTINATION).identifierType;
if (!objectType || !identifierType) {
throw new InstrumentationError('rETL - external Id not found.');
}
} else {
throw new InstrumentationError('rETL - objectType or identifier type not found. ');
const getObjectAndIdentifierType = (firstMessage) => {
const { objectType, identifierType } = getDestinationExternalIDInfoForRetl(
firstMessage,
DESTINATION,
);
if (!objectType || !identifierType) {
throw new InstrumentationError('rETL - external Id not found.');
}
inputs.map(async (input) => {
return { objectType, identifierType };
};

/**
* Returns values for search api call
* @param {*} inputs
* @returns
*/
const extractIDsForSearchAPI = (inputs) => {
const values = inputs.map((input) => {
const { message } = input;
const { destinationExternalId } = getDestinationExternalIDInfoForRetl(message, DESTINATION);
values.push(destinationExternalId.toString().toLowerCase());
return destinationExternalId.toString().toLowerCase();
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
});

values = Array.from(new Set(values));
const requestData = {
filterGroups: [
{
filters: [
{
propertyName: identifierType,
values,
operator: 'IN',
},
],
},
],
properties: [identifierType],
limit: SEARCH_LIMIT_VALUE,
after: 0,
};
return Array.from(new Set(values));
};

const requestOptions = {
headers: {
'Content-Type': JSON_MIME_TYPE,
Authorization: `Bearer ${Config.accessToken}`,
},
};
let checkAfter = 1; // variable to keep checking if we have more results
/**
* Returns hubspot records
* @param {*} data
* @param {*} requestOptions
* @param {*} objectType
* @param {*} identifierType
* @param {*} destination
* @returns
*/
const performHubSpotSearch = async (
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
reqdata,
reqOptions,
objectType,
identifierType,
destination,
) => {
let checkAfter = 1;
const searchResults = [];
const requestData = reqdata;
const { Config } = destination;

/* eslint-disable no-await-in-loop */
const endpoint = IDENTIFY_CRM_SEARCH_ALL_OBJECTS.replace(':objectType', objectType);
const endpointPath = `objects/:objectType/search`;

const url =
Config.authorizationType === 'newPrivateAppApi'
? endpoint
: `${endpoint}?hapikey=${Config.apiKey}`;

const requestOptions = Config.authorizationType === 'newPrivateAppApi' ? reqOptions : {};

/* *
* This is needed for processing paginated response when searching hubspot.
* we can't avoid await in loop as response to the request contains the pagination details
* */

while (checkAfter) {
const endpoint = IDENTIFY_CRM_SEARCH_ALL_OBJECTS.replace(':objectType', objectType);
const endpointPath = `objects/:objectType/search`;

const url =
Config.authorizationType === 'newPrivateAppApi'
? endpoint
: `${endpoint}?hapikey=${Config.apiKey}`;
searchResponse =
Config.authorizationType === 'newPrivateAppApi'
? await httpPOST(url, requestData, requestOptions, {
destType: 'hs',
feature: 'transformation',
endpointPath,
})
: await httpPOST(url, requestData, {
destType: 'hs',
feature: 'transformation',
endpointPath,
});
searchResponse = processAxiosResponse(searchResponse);

if (searchResponse.status !== 200) {
const searchResponse = await httpPOST(url, requestData, requestOptions, {
destType: 'hs',
feature: 'transformation',
endpointPath,
});

const processedResponse = processAxiosResponse(searchResponse);

if (processedResponse.status !== 200) {
throw new NetworkError(
`rETL - Error during searching object record. ${searchResponse.response?.message}`,
searchResponse.status,
`rETL - Error during searching object record. ${processedResponse.response?.message}`,
processedResponse.status,
{
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(searchResponse.status),
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(processedResponse.status),
},
searchResponse,
processedResponse,
);
}

const after = searchResponse.response?.paging?.next?.after || 0;

const after = processedResponse.response?.paging?.next?.after || 0;
requestData.after = after; // assigning to the new value of after
checkAfter = after; // assigning to the new value if no after we assign it to 0 and no more calls will take place

const results = searchResponse.response?.results;
const results = processedResponse.response?.results;
if (results) {
updateHubspotIds = results.map((result) => {
const propertyValue = result.properties[identifierType];
return { id: result.id, property: propertyValue };
});
searchResults.push(
...results.map((result) => ({
id: result.id,
property: result.properties[identifierType],
})),
);
}
}

return searchResults;
};

/**
* DOC: https://developers.hubspot.com/docs/api/crm/search
* @param {*} inputs
* @param {*} destination
*/
const getExistingContactsData = async (inputs, destination) => {
const { Config } = destination;
const updateHubspotIds = [];
const firstMessage = inputs[0].message;

if (!firstMessage) {
throw new InstrumentationError('rETL - objectType or identifier type not found.');

Check warning on line 585 in src/v0/destinations/hs/util.js

View check run for this annotation

Codecov / codecov/patch

src/v0/destinations/hs/util.js#L585

Added line #L585 was not covered by tests
}

const { objectType, identifierType } = getObjectAndIdentifierType(firstMessage);

const values = extractIDsForSearchAPI(inputs);
const valuesChunk = lodash.chunk(values, MAX_CONTACTS_PER_REQUEST);

// eslint-disable-next-line no-restricted-syntax
for (const chunk of valuesChunk) {
const requestData = {
filterGroups: [
{
filters: [
{
propertyName: identifierType,
values: chunk,
operator: 'IN',
},
],
},
],
properties: [identifierType],
limit: SEARCH_LIMIT_VALUE,
after: 0,
};

const requestOptions = {
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
headers: {
'Content-Type': JSON_MIME_TYPE,
Authorization: `Bearer ${Config.accessToken}`,
},
};
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
const searchResults = await performHubSpotSearch(
requestData,
requestOptions,
objectType,
identifierType,
destination,
);
if (searchResults.length > 0) {
updateHubspotIds.push(...searchResults);
}
}
return updateHubspotIds;
Expand Down Expand Up @@ -601,7 +656,7 @@

const splitEventsForCreateUpdate = async (inputs, destination) => {
// get all the id and properties of already existing objects needed for update.
const updateHubspotIds = await getExistingData(inputs, destination);
const updateHubspotIds = await getExistingContactsData(inputs, destination);

const resultInput = inputs.map((input) => {
const { message } = input;
Expand Down Expand Up @@ -680,4 +735,6 @@
validatePayloadDataTypes,
getUTCMidnightTimeStampValue,
populateTraits,
getObjectAndIdentifierType,
extractIDsForSearchAPI,
};
Loading
Loading