Skip to content

Commit

Permalink
feat: add support for headers to source transformation flows (#3683)
Browse files Browse the repository at this point in the history
* feat: add support for headers to source transformation flows

* chore: add types for source trasform event
  • Loading branch information
koladilip authored Sep 2, 2024
1 parent b1d0d08 commit f8cd6bd
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 29 deletions.
1 change: 1 addition & 0 deletions src/controllers/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export class SourceController {
version,
events,
);

const resplist = await integrationService.sourceTransformRoutine(
input,
source,
Expand Down
13 changes: 7 additions & 6 deletions src/services/source/__tests__/nativeIntegration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ afterEach(() => {
jest.clearAllMocks();
});

const headers = {
'x-rudderstack-source': 'test',
};

describe('NativeIntegration Source Service', () => {
test('sourceTransformRoutine - success', async () => {
const sourceType = '__rudder_test__';
const version = 'v0';
const requestMetadata = {};

const event = { message: { a: 'b' } };
const event = { message: { a: 'b' }, headers };
const events = [event, event];

const tevent = { anonymousId: 'test' } as RudderMessage;
const tevent = { anonymousId: 'test', context: { headers } } as RudderMessage;
const tresp = { output: { batch: [tevent] }, statusCode: 200 } as SourceTransformationResponse;

const tresponse = [
{ output: { batch: [{ anonymousId: 'test' }] }, statusCode: 200 },
{ output: { batch: [{ anonymousId: 'test' }] }, statusCode: 200 },
];
const tresponse = [tresp, tresp];

FetchHandler.getSourceHandler = jest.fn().mockImplementationOnce((d, v) => {
expect(d).toEqual(sourceType);
Expand Down
26 changes: 19 additions & 7 deletions src/services/source/__tests__/postTransformation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import {
} from '../../../types/index';
import { SourcePostTransformationService } from '../../source/postTransformation';

const headers = {
'x-rudderstack-source': 'test',
};

describe('Source PostTransformation Service', () => {
test('should handleFailureEventsSource', async () => {
const e = new Error('test error');
Expand All @@ -26,24 +30,32 @@ describe('Source PostTransformation Service', () => {
output: { batch: [{ anonymousId: 'test' }] },
} as SourceTransformationResponse;

const result = SourcePostTransformationService.handleSuccessEventsSource(event);
const postProcessedEvents = {
outputToSource: {},
output: { batch: [{ anonymousId: 'test', context: { headers } }] },
} as SourceTransformationResponse;

expect(result).toEqual(event);
const result = SourcePostTransformationService.handleSuccessEventsSource(event, { headers });

expect(result).toEqual(postProcessedEvents);
});

test('should return the events as batch in SourceTransformationResponse if it is an array', () => {
const headers = {
'x-rudderstack-source': 'test',
};
const events = [{ anonymousId: 'test' }, { anonymousId: 'test' }] as RudderMessage[];
const postProcessedEvents = events.map((event) => ({ ...event, context: { headers } }));
const result = SourcePostTransformationService.handleSuccessEventsSource(events, { headers });

const result = SourcePostTransformationService.handleSuccessEventsSource(events);

expect(result).toEqual({ output: { batch: events } });
expect(result).toEqual({ output: { batch: postProcessedEvents } });
});

test('should return the event as batch in SourceTransformationResponse if it is a single object', () => {
const event = { anonymousId: 'test' } as RudderMessage;

const result = SourcePostTransformationService.handleSuccessEventsSource(event);
const result = SourcePostTransformationService.handleSuccessEventsSource(event, { headers });

expect(result).toEqual({ output: { batch: [event] } });
expect(result).toEqual({ output: { batch: [{ ...event, context: { headers } }] } });
});
});
10 changes: 7 additions & 3 deletions src/services/source/nativeIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ErrorDetailer,
MetaTransferObject,
RudderMessage,
SourceTransformationEvent,
SourceTransformationResponse,
} from '../../types/index';
import stats from '../../util/stats';
Expand All @@ -27,7 +28,7 @@ export class NativeIntegrationSourceService implements SourceService {
}

public async sourceTransformRoutine(
sourceEvents: NonNullable<unknown>[],
sourceEvents: NonNullable<SourceTransformationEvent>[],
sourceType: string,
version: string,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -38,9 +39,12 @@ export class NativeIntegrationSourceService implements SourceService {
const respList: SourceTransformationResponse[] = await Promise.all<FixMe>(
sourceEvents.map(async (sourceEvent) => {
try {
const newSourceEvent = sourceEvent;
const { headers } = newSourceEvent;
delete newSourceEvent.headers;
const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse =
await sourceHandler.process(sourceEvent);
return SourcePostTransformationService.handleSuccessEventsSource(respEvents);
await sourceHandler.process(newSourceEvent);
return SourcePostTransformationService.handleSuccessEventsSource(respEvents, { headers });
} catch (error: FixMe) {
stats.increment('source_transform_errors', {
source: sourceType,
Expand Down
20 changes: 15 additions & 5 deletions src/services/source/postTransformation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { MetaTransferObject, RudderMessage, SourceTransformationResponse } from '../../types/index';
import { CommonUtils } from '../../util/common';
import { CatchErr } from '../../util/types';
import { generateErrorObject } from '../../v0/util';
import { ErrorReportingService } from '../errorReporting';
Expand All @@ -20,16 +21,25 @@ export class SourcePostTransformationService {

public static handleSuccessEventsSource(
events: RudderMessage | RudderMessage[] | SourceTransformationResponse,
context: { headers?: Record<string, string> },
): SourceTransformationResponse {
// We send response back to the source
// through outputToSource. This is not sent to gateway
// We will not return array for events not meant for gateway
if (Object.prototype.hasOwnProperty.call(events, 'outputToSource')) {
return events as SourceTransformationResponse;
let sourceTransformationResponse = events as SourceTransformationResponse;
if (!Object.prototype.hasOwnProperty.call(events, 'outputToSource')) {
const eventsBatch = CommonUtils.toArray(events);
sourceTransformationResponse = {
output: { batch: eventsBatch },
} as SourceTransformationResponse;
}
if (Array.isArray(events)) {
return { output: { batch: events } } as SourceTransformationResponse;

if (sourceTransformationResponse.output) {
sourceTransformationResponse.output.batch.forEach((event) => {
const newEvent = event as RudderMessage;
newEvent.context = { ...event.context, ...context };
});
}
return { output: { batch: [events] } } as SourceTransformationResponse;
return sourceTransformationResponse;
}
}
7 changes: 7 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ type RouterTransformationResponse = {
statTags?: object;
};

type SourceTransformationEvent = {
headers?: Record<string, string>;
query_params?: Record<string, string>;
[key: string]: any;
};

type SourceTransformationOutput = {
batch: RudderMessage[];
};
Expand Down Expand Up @@ -360,6 +366,7 @@ export {
RouterTransformationRequestData,
RouterTransformationResponse,
RudderMessage,
SourceTransformationEvent,
SourceTransformationResponse,
UserDeletionRequest,
UserDeletionResponse,
Expand Down
4 changes: 2 additions & 2 deletions src/v0/util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ const getDestinationExternalIDInfoForRetl = (message, destination) => {
if (externalIdArray) {
externalIdArray.forEach((extIdObj) => {
const { type, id } = extIdObj;
if (type && type.includes(`${destination}-`)) {
if (type?.includes(`${destination}-`)) {
destinationExternalId = id;
objectType = type.replace(`${destination}-`, '');
identifierType = extIdObj.identifierType;
Expand All @@ -1195,7 +1195,7 @@ const getDestinationExternalIDObjectForRetl = (message, destination) => {
// some stops the execution when the element is found
externalIdArray.some((extIdObj) => {
const { type } = extIdObj;
if (type && type.includes(`${destination}-`)) {
if (type?.includes(`${destination}-`)) {
obj = extIdObj;
return true;
}
Expand Down
3 changes: 2 additions & 1 deletion test/apitests/data_scenarios/source/v1/successful.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"fulfillment_id": "1234567890",
"status": "pending"
}
}
},
"context": {}
}
]
}
Expand Down
8 changes: 4 additions & 4 deletions test/apitests/service.api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,8 @@ describe('Api tests with a mock source/destination', () => {
.send(getData());

const expected = [
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
];

expect(response.status).toEqual(200);
Expand Down Expand Up @@ -398,8 +398,8 @@ describe('Api tests with a mock source/destination', () => {
.send(getData());

const expected = [
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
{ output: { batch: [{ event: 'clicked', type: 'track' }] } },
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
{ output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } },
];

expect(response.status).toEqual(200);
Expand Down
7 changes: 6 additions & 1 deletion test/integrations/sources/pipedream/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ export const data = [
{
userId: '1',
originalTimestamp: '2020-09-28T19:53:31.900Z',
context: {},
traits: {
firstName: 'John',
lastName: 'doe',
Expand Down Expand Up @@ -336,7 +337,11 @@ export const data = [
status: 200,
body: [
{
output: { batch: [{ type: 'alias', previousId: '[email protected]', userId: '12345' }] },
output: {
batch: [
{ type: 'alias', previousId: '[email protected]', userId: '12345', context: {} },
],
},
},
],
},
Expand Down
4 changes: 4 additions & 0 deletions test/integrations/sources/segment/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ export const data: SrcTestCaseData[] = [
elapsedTime: null,
session_id: '**************_***************',
},
context: {},
hostname: '************.us.auth0.com',
user_id: 'auth0|************************',
user_name: '[email protected]',
Expand All @@ -145,6 +146,7 @@ export const data: SrcTestCaseData[] = [
{
date: '2020-07-10T07:43:09.620Z',
type: 'seacft',
context: {},
description: '',
connection_id: '',
client_id: '********************************',
Expand Down Expand Up @@ -176,6 +178,7 @@ export const data: SrcTestCaseData[] = [
client_id: '********************************',
client_name: 'My App',
ip: '47.15.6.58',
context: {},
user_agent: 'Chrome Mobile 69.0.3497 / Android 0.0.0',
details: {
prompts: [],
Expand Down Expand Up @@ -207,6 +210,7 @@ export const data: SrcTestCaseData[] = [
client_id: '********************************',
client_name: 'My App',
ip: '47.15.6.58',
context: {},
user_agent: 'Chrome Mobile 69.0.3497 / Android 0.0.0',
details: {
prompts: [],
Expand Down

0 comments on commit f8cd6bd

Please sign in to comment.