Skip to content

Commit

Permalink
fix: braze subscription group fixes (#3901)
Browse files Browse the repository at this point in the history
  • Loading branch information
aashishmalik authored Dec 9, 2024
1 parent 12621c8 commit ebcf84e
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 18 deletions.
12 changes: 12 additions & 0 deletions src/util/prometheus.js
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,18 @@ class Prometheus {
type: 'counter',
labelNames: ['destination_id'],
},
{
name: 'braze_batch_subscription_size',
help: 'braze_batch_subscription_size',
type: 'gauge',
labelNames: ['destination_id'],
},
{
name: 'braze_batch_subscription_combined_size',
help: 'braze_batch_subscription_combined_size',
type: 'gauge',
labelNames: ['destination_id'],
},
{
name: 'mailjet_packing_size',
help: 'mailjet_packing_size',
Expand Down
151 changes: 140 additions & 11 deletions src/v0/destinations/braze/braze.util.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
getPurchaseObjs,
setAliasObject,
handleReservedProperties,
combineSubscriptionGroups,
} = require('./util');
const { processBatch } = require('./util');
const {
Expand Down Expand Up @@ -958,7 +959,9 @@ describe('processBatch', () => {
attributes: [{ id: i, name: 'test', xyz: 'abc' }],
events: [{ id: i, event: 'test', xyz: 'abc' }],
purchases: [{ id: i, purchase: 'test', xyz: 'abc' }],
subscription_groups: [{ id: i, group: 'test', xyz: 'abc' }],
subscription_groups: [
{ subscription_group_id: i, group: 'test', subscription_state: 'abc' },
],
merge_updates: [{ id: i, alias: 'test', xyz: 'abc' }],
},
},
Expand All @@ -972,7 +975,7 @@ describe('processBatch', () => {

// Assert that the response is as expected
expect(result.length).toBe(1); // One successful batched request and one failure response
expect(result[0].batchedRequest.length).toBe(6); // Two batched requests
expect(result[0].batchedRequest.length).toBe(8); // Two batched requests
expect(result[0].batchedRequest[0].body.JSON.partner).toBe('RudderStack'); // Verify partner name
expect(result[0].batchedRequest[0].body.JSON.attributes.length).toBe(75); // First batch contains 75 attributes
expect(result[0].batchedRequest[0].body.JSON.events.length).toBe(75); // First batch contains 75 events
Expand All @@ -981,10 +984,12 @@ describe('processBatch', () => {
expect(result[0].batchedRequest[1].body.JSON.attributes.length).toBe(25); // Second batch contains remaining 25 attributes
expect(result[0].batchedRequest[1].body.JSON.events.length).toBe(25); // Second batch contains remaining 25 events
expect(result[0].batchedRequest[1].body.JSON.purchases.length).toBe(25); // Second batch contains remaining 25 purchases
expect(result[0].batchedRequest[2].body.JSON.subscription_groups.length).toBe(50); // First batch contains 50 subscription group
expect(result[0].batchedRequest[3].body.JSON.subscription_groups.length).toBe(50);
expect(result[0].batchedRequest[4].body.JSON.merge_updates.length).toBe(50); // First batch contains 50 merge_updates
expect(result[0].batchedRequest[5].body.JSON.merge_updates.length).toBe(50); // First batch contains 25 merge_updates
expect(result[0].batchedRequest[2].body.JSON.subscription_groups.length).toBe(25); // First batch contains 25 subscription group
expect(result[0].batchedRequest[3].body.JSON.subscription_groups.length).toBe(25); // Second batch contains 25 subscription group
expect(result[0].batchedRequest[4].body.JSON.subscription_groups.length).toBe(25); // Third batch contains 25 subscription group
expect(result[0].batchedRequest[5].body.JSON.subscription_groups.length).toBe(25); // Fourth batch contains 25 subscription group
expect(result[0].batchedRequest[6].body.JSON.merge_updates.length).toBe(50); // First batch contains 50 merge_updates
expect(result[0].batchedRequest[7].body.JSON.merge_updates.length).toBe(50); // First batch contains 25 merge_updates
});

test('processBatch handles more than 75 attributes, events, and purchases with non uniform distribution', () => {
Expand Down Expand Up @@ -1055,7 +1060,9 @@ describe('processBatch', () => {
batchedRequest: {
body: {
JSON: {
subscription_groups: [{ id: i, group: 'test', xyz: 'abc' }],
subscription_groups: [
{ subscription_group_id: i, group: 'test', subscription_state: 'abc' },
],
},
},
},
Expand Down Expand Up @@ -1093,7 +1100,7 @@ describe('processBatch', () => {
// Assert that the response is as expected
expect(result.length).toBe(1); // One successful batched request and one failure response
expect(result[0].metadata.length).toBe(490); // Check the total length is same as input jobs (120 + 160 + 100 + 70 +40)
expect(result[0].batchedRequest.length).toBe(6); // Two batched requests
expect(result[0].batchedRequest.length).toBe(7); // Two batched requests
expect(result[0].batchedRequest[0].body.JSON.partner).toBe('RudderStack'); // Verify partner name
expect(result[0].batchedRequest[0].body.JSON.attributes.length).toBe(75); // First batch contains 75 attributes
expect(result[0].batchedRequest[0].body.JSON.events.length).toBe(75); // First batch contains 75 events
Expand All @@ -1103,9 +1110,10 @@ describe('processBatch', () => {
expect(result[0].batchedRequest[1].body.JSON.events.length).toBe(45); // Second batch contains remaining 45 events
expect(result[0].batchedRequest[1].body.JSON.purchases.length).toBe(75); // Second batch contains remaining 75 purchases
expect(result[0].batchedRequest[2].body.JSON.purchases.length).toBe(10); // Third batch contains remaining 10 purchases
expect(result[0].batchedRequest[3].body.JSON.subscription_groups.length).toBe(50); // First batch contains 50 subscription group
expect(result[0].batchedRequest[4].body.JSON.subscription_groups.length).toBe(20); // Second batch contains 20 subscription group
expect(result[0].batchedRequest[5].body.JSON.merge_updates.length).toBe(40); // First batch contains 40 merge_updates
expect(result[0].batchedRequest[3].body.JSON.subscription_groups.length).toBe(25); // First batch contains 25 subscription group
expect(result[0].batchedRequest[4].body.JSON.subscription_groups.length).toBe(25); // Second batch contains 25 subscription group
expect(result[0].batchedRequest[5].body.JSON.subscription_groups.length).toBe(20); // Third batch contains 20 subscription group
expect(result[0].batchedRequest[6].body.JSON.merge_updates.length).toBe(40); // First batch contains 50 merge_updates
});

test('check success and failure scenarios both for processBatch', () => {
Expand Down Expand Up @@ -1751,3 +1759,124 @@ describe('handleReservedProperties', () => {
});
});
});

describe('combineSubscriptionGroups', () => {
it('should merge external_ids, emails, and phones for the same subscription_group_id and subscription_state', () => {
const input = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1', 'id2'],
emails: ['[email protected]', '[email protected]'],
phones: ['+1234567890', '+0987654321'],
},
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id2', 'id3'],
emails: ['[email protected]', '[email protected]'],
phones: ['+1234567890', '+1122334455'],
},
];

const expectedOutput = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1', 'id2', 'id3'],
emails: ['[email protected]', '[email protected]', '[email protected]'],
phones: ['+1234567890', '+0987654321', '+1122334455'],
},
];

const result = combineSubscriptionGroups(input);
expect(result).toEqual(expectedOutput);
});

it('should handle groups with missing external_ids, emails, or phones', () => {
const input = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1'],
},
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
emails: ['[email protected]'],
},
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
phones: ['+1234567890'],
},
];

const expectedOutput = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1'],
emails: ['[email protected]'],
phones: ['+1234567890'],
},
];

const result = combineSubscriptionGroups(input);
expect(result).toEqual(expectedOutput);
});

it('should handle multiple unique subscription groups', () => {
const input = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1'],
},
{
subscription_group_id: 'group2',
subscription_state: 'Unsubscribed',
external_ids: ['id2'],
emails: ['[email protected]'],
},
];

const expectedOutput = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1'],
},
{
subscription_group_id: 'group2',
subscription_state: 'Unsubscribed',
external_ids: ['id2'],
emails: ['[email protected]'],
},
];

const result = combineSubscriptionGroups(input);
expect(result).toEqual(expectedOutput);
});

it('should not include undefined fields in the output', () => {
const input = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1'],
},
];

const expectedOutput = [
{
subscription_group_id: 'group1',
subscription_state: 'Subscribed',
external_ids: ['id1'],
},
];

const result = combineSubscriptionGroups(input);
expect(result).toEqual(expectedOutput);
});
});
2 changes: 1 addition & 1 deletion src/v0/destinations/braze/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const IDENTIFY_BRAZE_MAX_REQ_COUNT = 50;
// https://www.braze.com/docs/api/endpoints/user_data/post_user_delete/

const ALIAS_BRAZE_MAX_REQ_COUNT = 50;
const SUBSCRIPTION_BRAZE_MAX_REQ_COUNT = 50;
const SUBSCRIPTION_BRAZE_MAX_REQ_COUNT = 25;

const DEL_MAX_BATCH_SIZE = 50;
const DESTINATION = 'braze';
Expand Down
2 changes: 1 addition & 1 deletion src/v0/destinations/braze/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ function processGroup(message, destination) {
);
}
subscriptionGroup.subscription_state = message.traits.subscriptionState;
subscriptionGroup.external_id = [message.userId];
subscriptionGroup.external_ids = [message.userId];
const phone = getFieldValueFromMessage(message, 'phone');
const email = getFieldValueFromMessage(message, 'email');
if (phone) {
Expand Down
54 changes: 53 additions & 1 deletion src/v0/destinations/braze/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,42 @@ const getEndpointFromConfig = (destination) => {
return endpoint;
};

// Merges external_ids, emails, and phones for entries with the same subscription_group_id and subscription_state
const combineSubscriptionGroups = (subscriptionGroups) => {
const uniqueGroups = {};

subscriptionGroups.forEach((group) => {
const key = `${group.subscription_group_id}-${group.subscription_state}`;
if (!uniqueGroups[key]) {
uniqueGroups[key] = {
...group,
external_ids: [...(group.external_ids || [])],
emails: [...(group.emails || [])],
phones: [...(group.phones || [])],
};
} else {
uniqueGroups[key].external_ids.push(...(group.external_ids || []));
uniqueGroups[key].emails.push(...(group.emails || []));
uniqueGroups[key].phones.push(...(group.phones || []));
}
});

return Object.values(uniqueGroups).map((group) => {
const result = {
subscription_group_id: group.subscription_group_id,
subscription_state: group.subscription_state,
external_ids: [...new Set(group.external_ids)],
};
if (group.emails?.length) {
result.emails = [...new Set(group.emails)];
}
if (group.phones?.length) {
result.phones = [...new Set(group.phones)];
}
return result;
});
};

const CustomAttributeOperationUtil = {
customAttributeUpdateOperation(key, data, traits, mergeObjectsUpdateOperation) {
data[key] = {};
Expand Down Expand Up @@ -381,8 +417,22 @@ function prepareGroupAndAliasBatch(arrayChunks, responseArray, destination, type
} else if (type === 'subscription') {
response.endpoint = getSubscriptionGroupEndPoint(getEndpointFromConfig(destination));
const subscription_groups = chunk;
// maketool transformed event
logger.info(`braze subscription chunk ${JSON.stringify(subscription_groups)}`);

stats.gauge('braze_batch_subscription_size', subscription_groups.length, {
destination_id: destination.ID,
});

// Deduplicate the subscription groups before constructing the response body
const deduplicatedSubscriptionGroups = combineSubscriptionGroups(subscription_groups);

stats.gauge('braze_batch_subscription_combined_size', deduplicatedSubscriptionGroups.length, {
destination_id: destination.ID,
});

response.body.JSON = removeUndefinedAndNullValues({
subscription_groups,
subscription_groups: deduplicatedSubscriptionGroups,
});
}
responseArray.push({
Expand Down Expand Up @@ -490,6 +540,7 @@ const processBatch = (transformedEvents) => {
prepareGroupAndAliasBatch(mergeUsersArrayChunks, responseArray, destination, 'merge');

if (successMetadata.length > 0) {
console.log(`Response 1 batchRequest ${JSON.stringify(responseArray)}`);
finalResponse.push({
batchedRequest: responseArray,
metadata: successMetadata,
Expand Down Expand Up @@ -756,4 +807,5 @@ module.exports = {
collectStatsForAliasFailure,
collectStatsForAliasMissConfigurations,
handleReservedProperties,
combineSubscriptionGroups,
};
4 changes: 2 additions & 2 deletions test/integrations/destinations/braze/processor/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3160,7 +3160,7 @@ export const data = [
{
subscription_group_id: '1234',
subscription_state: 'subscribed',
external_id: ['Randomuser2222'],
external_ids: ['Randomuser2222'],
phones: ['5055077683'],
},
],
Expand Down Expand Up @@ -3281,7 +3281,7 @@ export const data = [
{
subscription_group_id: '1234',
subscription_state: 'unsubscribed',
external_id: ['Randomuser2222'],
external_ids: ['Randomuser2222'],
emails: ['[email protected]'],
},
],
Expand Down
33 changes: 31 additions & 2 deletions test/integrations/destinations/braze/router/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,34 @@ export const data = [
metadata: { jobId: 6, userId: 'u1' },
message: { type: 'alias', previousId: 'adsfsaf2', userId: 'dsafsdf2' },
},
{
destination: {
Config: {
restApiKey: 'dummyApiKey',
prefixProperties: true,
useNativeSDK: false,
dataCenter: 'us-01',
enableSubscriptionGroupInGroupCall: true,
},
DestinationDefinition: {
DisplayName: 'Braze',
ID: '1WhbSZ6uA3H5ChVifHpfL2H6sie',
Name: 'BRAZE',
},
Enabled: true,
ID: '1WhcOCGgj9asZu850HvugU2C3Aq',
Name: 'Braze',
Transformations: [],
},
metadata: { jobId: 7, userId: 'u1' },
message: {
anonymousId: '56yrtsdfgbgxcb-22b4-401d-aae5-1b994be9afdf',
groupId: 'c90f0fd2-2a02-4f2f-bf07-7e7d2c2ed2b1',
traits: { phone: '5055077683', subscriptionState: 'subscribed' },
userId: 'user12345',
type: 'group',
},
},
],
destType: 'braze',
},
Expand Down Expand Up @@ -299,13 +327,13 @@ export const data = [
JSON: {
subscription_groups: [
{
external_id: ['user123'],
external_ids: ['user123', 'user12345'],
phones: ['5055077683'],
subscription_group_id: 'c90f0fd2-2a02-4f2f-bf07-7e7d2c2ed2b1',
subscription_state: 'subscribed',
},
{
external_id: ['user877'],
external_ids: ['user877'],
phones: ['5055077683'],
subscription_group_id: '58d0a278-b55b-4f10-b7d2-98d1c5dd4c30',
subscription_state: 'subscribed',
Expand Down Expand Up @@ -356,6 +384,7 @@ export const data = [
{ jobId: 4, userId: 'u1' },
{ jobId: 5, userId: 'u1' },
{ jobId: 6, userId: 'u1' },
{ jobId: 7, userId: 'u1' },
],
batched: true,
statusCode: 200,
Expand Down

0 comments on commit ebcf84e

Please sign in to comment.