Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bqstream): event ordering #2558

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 44 additions & 29 deletions src/v0/destinations/bqstream/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const {
} = require('../../util');
const { MAX_ROWS_PER_REQUEST, DESTINATION } = require('./config');
const { InstrumentationError } = require('../../util/errorTypes');
const { getGroupedEvents } = require('./util');

const getInsertIdColValue = (properties, insertIdCol) => {
if (
Expand Down Expand Up @@ -102,38 +103,52 @@ const processRouterDest = (inputs) => {
if (errorRespEvents.length > 0) {
return errorRespEvents;
}

const eventsChunk = []; // temporary variable to divide payload into chunks
const errorRespList = [];

inputs.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
const groupedEvents = getGroupedEvents(inputs);
const finalResp = [];
groupedEvents.forEach((eventList) => {
let eventsChunk = []; // temporary variable to divide payload into chunks
let errorRespList = [];
if (eventList.length > 0) {
eventList.forEach((event) => {
try {
if (event.message.statusCode) {
// already transformed event
eventsChunk.push(event);
} else {
// if not transformed
let response = process(event);
response = Array.isArray(response) ? response : [response];
response.forEach((res) => {
eventsChunk.push({
message: res,
metadata: event.metadata,
destination: event.destination,
});
});
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
// divide the successful payloads till now into batches
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
// clear up the temporary variable
eventsChunk = [];
errorRespList.push(errRespEvent);
finalResp.push([...batchedResponseList, ...errorRespList]);
// putting it back as an empty array
errorRespList = [];
}
});
let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
finalResp.push([...batchedResponseList]);
}
} catch (error) {
const errRespEvent = handleRtTfSingleEventError(event, error, DESTINATION);
errorRespList.push(errRespEvent);
}
});

let batchedResponseList = [];
if (eventsChunk.length > 0) {
batchedResponseList = batchEvents(eventsChunk);
}
return [...batchedResponseList, ...errorRespList];
return finalResp.flat();
};

module.exports = { process, processRouterDest };
16 changes: 15 additions & 1 deletion src/v0/destinations/bqstream/util.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* eslint-disable no-param-reassign */
const _ = require('lodash');
const getValue = require('get-value');
const {
getDynamicErrorType,
Expand Down Expand Up @@ -143,4 +144,17 @@ function networkHandler() {
this.processAxiosResponse = processAxiosResponse;
}

module.exports = { networkHandler };
/**
* Groups the input events based on the `userId` property
*
* @param {Array} inputs - An array of objects representing events with `metadata.userId` property.
* @returns {Array} An array of arrays containing the grouped events.
* Each inner array represents a user journey.
*/
const getGroupedEvents = (inputs) => {
const userIdEventMap = _.groupBy(inputs, 'metadata.userId');
const eventGroupedByUserId = Object.values(userIdEventMap);
return eventGroupedByUserId;
};

module.exports = { networkHandler, getGroupedEvents };
236 changes: 236 additions & 0 deletions test/integrations/destinations/bqstream/router/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export const data = [
},
metadata: {
jobId: 1,
userId: 'user12345',
},
destination: {
Config: {
Expand Down Expand Up @@ -153,6 +154,169 @@ export const data = [
},
metadata: {
jobId: 2,
userId: 'user12345',
mihir-4116 marked this conversation as resolved.
Show resolved Hide resolved
},
destination: {
Config: {
rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-',
projectId: 'gc-project-id',
datasetId: 'gc_dataset',
tableId: 'gc_table',
insertId: 'productId',
eventDelivery: true,
eventDeliveryTS: 1636965406397,
},
Enabled: true,
ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR',
Name: 'bqstream test',
},
},
{
message: {
type: 'identify',
event: 'insert product',
sentAt: '2021-09-08T11:10:45.466Z',
userId: 'user12345',
channel: 'web',
context: {
os: {
name: '',
version: '',
},
app: {
name: 'RudderLabs JavaScript SDK',
build: '1.0.0',
version: '1.1.18',
namespace: 'com.rudderlabs.javascript',
},
page: {
url: 'http://127.0.0.1:5500/index.html',
path: '/index.html',
title: 'Document',
search: '',
tab_url: 'http://127.0.0.1:5500/index.html',
referrer: '$direct',
initial_referrer: '$direct',
referring_domain: '',
initial_referring_domain: '',
},
locale: 'en-GB',
screen: {
width: 1536,
height: 960,
density: 2,
innerWidth: 1536,
innerHeight: 776,
},
traits: {},
library: {
name: 'RudderLabs JavaScript SDK',
version: '1.1.18',
},
campaign: {},
userAgent:
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
},
rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380',
messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95',
timestamp: '2021-11-15T14:06:42.497+05:30',
traits: {
count: 20,
productId: 20,
productName: 'Product-20',
},
receivedAt: '2021-11-15T14:06:42.497+05:30',
request_ip: '[::1]',
anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf',
integrations: {
All: true,
},
originalTimestamp: '2021-09-08T11:10:45.466Z',
},
metadata: {
jobId: 3,
userId: 'user12345',
},
destination: {
Config: {
rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-',
projectId: 'gc-project-id',
datasetId: 'gc_dataset',
tableId: 'gc_table',
insertId: 'productId',
eventDelivery: true,
eventDeliveryTS: 1636965406397,
},
Enabled: true,
ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR',
Name: 'bqstream test',
},
},
{
message: {
type: 'track',
event: 'insert product',
sentAt: '2021-09-08T11:10:45.466Z',
userId: 'user12345',
channel: 'web',
context: {
os: {
name: '',
version: '',
},
app: {
name: 'RudderLabs JavaScript SDK',
build: '1.0.0',
version: '1.1.18',
namespace: 'com.rudderlabs.javascript',
},
page: {
url: 'http://127.0.0.1:5500/index.html',
path: '/index.html',
title: 'Document',
search: '',
tab_url: 'http://127.0.0.1:5500/index.html',
referrer: '$direct',
initial_referrer: '$direct',
referring_domain: '',
initial_referring_domain: '',
},
locale: 'en-GB',
screen: {
width: 1536,
height: 960,
density: 2,
innerWidth: 1536,
innerHeight: 776,
},
traits: {},
library: {
name: 'RudderLabs JavaScript SDK',
version: '1.1.18',
},
campaign: {},
userAgent:
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Safari/537.36',
},
rudderId: 'fa2994a5-2a81-45fd-9919-fcf5596ad380',
messageId: 'e2d1a383-d9a2-4e03-a9dc-131d153c4d95',
timestamp: '2021-11-15T14:06:42.497+05:30',
properties: {
count: 20,
productId: 20,
productName: 'Product-20',
},
receivedAt: '2021-11-15T14:06:42.497+05:30',
request_ip: '[::1]',
anonymousId: 'd8b2ed61-7fa5-4ef8-bd92-6a506157c0cf',
integrations: {
All: true,
},
originalTimestamp: '2021-09-08T11:10:45.466Z',
},
metadata: {
jobId: 5,
userId: 'user123',
},
destination: {
Config: {
Expand Down Expand Up @@ -203,9 +367,11 @@ export const data = [
metadata: [
{
jobId: 1,
userId: 'user12345',
},
{
jobId: 2,
userId: 'user12345',
},
],
batched: true,
Expand All @@ -225,6 +391,76 @@ export const data = [
Name: 'bqstream test',
},
},
{
batched: false,
destination: {
Config: {
datasetId: 'gc_dataset',
eventDelivery: true,
eventDeliveryTS: 1636965406397,
insertId: 'productId',
projectId: 'gc-project-id',
rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-',
tableId: 'gc_table',
},
Enabled: true,
ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR',
Name: 'bqstream test',
},
error: 'Message Type not supported: identify',
metadata: [
{
jobId: 3,
userId: 'user12345',
},
],
statTags: {
destType: 'BQSTREAM',
errorCategory: 'dataValidation',
errorType: 'instrumentation',
feature: 'router',
implementation: 'native',
module: 'destination',
},
statusCode: 400,
},
{
batched: true,
shrouti1507 marked this conversation as resolved.
Show resolved Hide resolved
batchedRequest: {
datasetId: 'gc_dataset',
projectId: 'gc-project-id',
properties: [
{
count: 20,
insertId: '20',
productId: 20,
productName: 'Product-20',
},
],
tableId: 'gc_table',
},
destination: {
Config: {
datasetId: 'gc_dataset',
eventDelivery: true,
eventDeliveryTS: 1636965406397,
insertId: 'productId',
projectId: 'gc-project-id',
rudderAccountId: '1z8LpaSAuFR9TPWL6fECZfjmRa-',
tableId: 'gc_table',
},
Enabled: true,
ID: '1WXjIHpu7ETXgjfiGPW3kCUgZFR',
Name: 'bqstream test',
},
metadata: [
{
jobId: 5,
userId: 'user123',
},
],
statusCode: 200,
},
],
},
},
Expand Down