From f8cd6bda158654501555554ac0d284af8ba058fd Mon Sep 17 00:00:00 2001 From: Dilip Kola <33080863+koladilip@users.noreply.github.com> Date: Mon, 2 Sep 2024 11:32:09 +0530 Subject: [PATCH] feat: add support for headers to source transformation flows (#3683) * feat: add support for headers to source transformation flows * chore: add types for source trasform event --- src/controllers/source.ts | 1 + .../__tests__/nativeIntegration.test.ts | 13 +++++----- .../__tests__/postTransformation.test.ts | 26 ++++++++++++++----- src/services/source/nativeIntegration.ts | 10 ++++--- src/services/source/postTransformation.ts | 20 ++++++++++---- src/types/index.ts | 7 +++++ src/v0/util/index.js | 4 +-- .../data_scenarios/source/v1/successful.json | 3 ++- test/apitests/service.api.test.ts | 8 +++--- test/integrations/sources/pipedream/data.ts | 7 ++++- test/integrations/sources/segment/data.ts | 4 +++ 11 files changed, 74 insertions(+), 29 deletions(-) diff --git a/src/controllers/source.ts b/src/controllers/source.ts index bc4b77bd3d..230636f193 100644 --- a/src/controllers/source.ts +++ b/src/controllers/source.ts @@ -18,6 +18,7 @@ export class SourceController { version, events, ); + const resplist = await integrationService.sourceTransformRoutine( input, source, diff --git a/src/services/source/__tests__/nativeIntegration.test.ts b/src/services/source/__tests__/nativeIntegration.test.ts index a2a5af041e..2ef8129cdc 100644 --- a/src/services/source/__tests__/nativeIntegration.test.ts +++ b/src/services/source/__tests__/nativeIntegration.test.ts @@ -8,22 +8,23 @@ afterEach(() => { jest.clearAllMocks(); }); +const headers = { + 'x-rudderstack-source': 'test', +}; + describe('NativeIntegration Source Service', () => { test('sourceTransformRoutine - success', async () => { const sourceType = '__rudder_test__'; const version = 'v0'; const requestMetadata = {}; - const event = { message: { a: 'b' } }; + const event = { message: { a: 'b' }, headers }; const events = [event, event]; - const tevent = { anonymousId: 'test' } as RudderMessage; + const tevent = { anonymousId: 'test', context: { headers } } as RudderMessage; const tresp = { output: { batch: [tevent] }, statusCode: 200 } as SourceTransformationResponse; - const tresponse = [ - { output: { batch: [{ anonymousId: 'test' }] }, statusCode: 200 }, - { output: { batch: [{ anonymousId: 'test' }] }, statusCode: 200 }, - ]; + const tresponse = [tresp, tresp]; FetchHandler.getSourceHandler = jest.fn().mockImplementationOnce((d, v) => { expect(d).toEqual(sourceType); diff --git a/src/services/source/__tests__/postTransformation.test.ts b/src/services/source/__tests__/postTransformation.test.ts index e5efbe8194..ea8b463bf9 100644 --- a/src/services/source/__tests__/postTransformation.test.ts +++ b/src/services/source/__tests__/postTransformation.test.ts @@ -5,6 +5,10 @@ import { } from '../../../types/index'; import { SourcePostTransformationService } from '../../source/postTransformation'; +const headers = { + 'x-rudderstack-source': 'test', +}; + describe('Source PostTransformation Service', () => { test('should handleFailureEventsSource', async () => { const e = new Error('test error'); @@ -26,24 +30,32 @@ describe('Source PostTransformation Service', () => { output: { batch: [{ anonymousId: 'test' }] }, } as SourceTransformationResponse; - const result = SourcePostTransformationService.handleSuccessEventsSource(event); + const postProcessedEvents = { + outputToSource: {}, + output: { batch: [{ anonymousId: 'test', context: { headers } }] }, + } as SourceTransformationResponse; - expect(result).toEqual(event); + const result = SourcePostTransformationService.handleSuccessEventsSource(event, { headers }); + + expect(result).toEqual(postProcessedEvents); }); test('should return the events as batch in SourceTransformationResponse if it is an array', () => { + const headers = { + 'x-rudderstack-source': 'test', + }; const events = [{ anonymousId: 'test' }, { anonymousId: 'test' }] as RudderMessage[]; + const postProcessedEvents = events.map((event) => ({ ...event, context: { headers } })); + const result = SourcePostTransformationService.handleSuccessEventsSource(events, { headers }); - const result = SourcePostTransformationService.handleSuccessEventsSource(events); - - expect(result).toEqual({ output: { batch: events } }); + expect(result).toEqual({ output: { batch: postProcessedEvents } }); }); test('should return the event as batch in SourceTransformationResponse if it is a single object', () => { const event = { anonymousId: 'test' } as RudderMessage; - const result = SourcePostTransformationService.handleSuccessEventsSource(event); + const result = SourcePostTransformationService.handleSuccessEventsSource(event, { headers }); - expect(result).toEqual({ output: { batch: [event] } }); + expect(result).toEqual({ output: { batch: [{ ...event, context: { headers } }] } }); }); }); diff --git a/src/services/source/nativeIntegration.ts b/src/services/source/nativeIntegration.ts index a4f26d068a..5c89de7b92 100644 --- a/src/services/source/nativeIntegration.ts +++ b/src/services/source/nativeIntegration.ts @@ -4,6 +4,7 @@ import { ErrorDetailer, MetaTransferObject, RudderMessage, + SourceTransformationEvent, SourceTransformationResponse, } from '../../types/index'; import stats from '../../util/stats'; @@ -27,7 +28,7 @@ export class NativeIntegrationSourceService implements SourceService { } public async sourceTransformRoutine( - sourceEvents: NonNullable[], + sourceEvents: NonNullable[], sourceType: string, version: string, // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -38,9 +39,12 @@ export class NativeIntegrationSourceService implements SourceService { const respList: SourceTransformationResponse[] = await Promise.all( sourceEvents.map(async (sourceEvent) => { try { + const newSourceEvent = sourceEvent; + const { headers } = newSourceEvent; + delete newSourceEvent.headers; const respEvents: RudderMessage | RudderMessage[] | SourceTransformationResponse = - await sourceHandler.process(sourceEvent); - return SourcePostTransformationService.handleSuccessEventsSource(respEvents); + await sourceHandler.process(newSourceEvent); + return SourcePostTransformationService.handleSuccessEventsSource(respEvents, { headers }); } catch (error: FixMe) { stats.increment('source_transform_errors', { source: sourceType, diff --git a/src/services/source/postTransformation.ts b/src/services/source/postTransformation.ts index 20c815171b..c62f0ed713 100644 --- a/src/services/source/postTransformation.ts +++ b/src/services/source/postTransformation.ts @@ -1,4 +1,5 @@ import { MetaTransferObject, RudderMessage, SourceTransformationResponse } from '../../types/index'; +import { CommonUtils } from '../../util/common'; import { CatchErr } from '../../util/types'; import { generateErrorObject } from '../../v0/util'; import { ErrorReportingService } from '../errorReporting'; @@ -20,16 +21,25 @@ export class SourcePostTransformationService { public static handleSuccessEventsSource( events: RudderMessage | RudderMessage[] | SourceTransformationResponse, + context: { headers?: Record }, ): SourceTransformationResponse { // We send response back to the source // through outputToSource. This is not sent to gateway // We will not return array for events not meant for gateway - if (Object.prototype.hasOwnProperty.call(events, 'outputToSource')) { - return events as SourceTransformationResponse; + let sourceTransformationResponse = events as SourceTransformationResponse; + if (!Object.prototype.hasOwnProperty.call(events, 'outputToSource')) { + const eventsBatch = CommonUtils.toArray(events); + sourceTransformationResponse = { + output: { batch: eventsBatch }, + } as SourceTransformationResponse; } - if (Array.isArray(events)) { - return { output: { batch: events } } as SourceTransformationResponse; + + if (sourceTransformationResponse.output) { + sourceTransformationResponse.output.batch.forEach((event) => { + const newEvent = event as RudderMessage; + newEvent.context = { ...event.context, ...context }; + }); } - return { output: { batch: [events] } } as SourceTransformationResponse; + return sourceTransformationResponse; } } diff --git a/src/types/index.ts b/src/types/index.ts index 150758363e..7aa6bd8ebf 100644 --- a/src/types/index.ts +++ b/src/types/index.ts @@ -185,6 +185,12 @@ type RouterTransformationResponse = { statTags?: object; }; +type SourceTransformationEvent = { + headers?: Record; + query_params?: Record; + [key: string]: any; +}; + type SourceTransformationOutput = { batch: RudderMessage[]; }; @@ -360,6 +366,7 @@ export { RouterTransformationRequestData, RouterTransformationResponse, RudderMessage, + SourceTransformationEvent, SourceTransformationResponse, UserDeletionRequest, UserDeletionResponse, diff --git a/src/v0/util/index.js b/src/v0/util/index.js index 7e9b4f0c3e..db84fb0627 100644 --- a/src/v0/util/index.js +++ b/src/v0/util/index.js @@ -1168,7 +1168,7 @@ const getDestinationExternalIDInfoForRetl = (message, destination) => { if (externalIdArray) { externalIdArray.forEach((extIdObj) => { const { type, id } = extIdObj; - if (type && type.includes(`${destination}-`)) { + if (type?.includes(`${destination}-`)) { destinationExternalId = id; objectType = type.replace(`${destination}-`, ''); identifierType = extIdObj.identifierType; @@ -1195,7 +1195,7 @@ const getDestinationExternalIDObjectForRetl = (message, destination) => { // some stops the execution when the element is found externalIdArray.some((extIdObj) => { const { type } = extIdObj; - if (type && type.includes(`${destination}-`)) { + if (type?.includes(`${destination}-`)) { obj = extIdObj; return true; } diff --git a/test/apitests/data_scenarios/source/v1/successful.json b/test/apitests/data_scenarios/source/v1/successful.json index c42d723800..bb93e72fcd 100644 --- a/test/apitests/data_scenarios/source/v1/successful.json +++ b/test/apitests/data_scenarios/source/v1/successful.json @@ -29,7 +29,8 @@ "fulfillment_id": "1234567890", "status": "pending" } - } + }, + "context": {} } ] } diff --git a/test/apitests/service.api.test.ts b/test/apitests/service.api.test.ts index b4ed8d3b09..30d2c568a6 100644 --- a/test/apitests/service.api.test.ts +++ b/test/apitests/service.api.test.ts @@ -359,8 +359,8 @@ describe('Api tests with a mock source/destination', () => { .send(getData()); const expected = [ - { output: { batch: [{ event: 'clicked', type: 'track' }] } }, - { output: { batch: [{ event: 'clicked', type: 'track' }] } }, + { output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } }, + { output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } }, ]; expect(response.status).toEqual(200); @@ -398,8 +398,8 @@ describe('Api tests with a mock source/destination', () => { .send(getData()); const expected = [ - { output: { batch: [{ event: 'clicked', type: 'track' }] } }, - { output: { batch: [{ event: 'clicked', type: 'track' }] } }, + { output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } }, + { output: { batch: [{ event: 'clicked', type: 'track', context: {} }] } }, ]; expect(response.status).toEqual(200); diff --git a/test/integrations/sources/pipedream/data.ts b/test/integrations/sources/pipedream/data.ts index 4be5621f0c..a4b5c33e0d 100644 --- a/test/integrations/sources/pipedream/data.ts +++ b/test/integrations/sources/pipedream/data.ts @@ -199,6 +199,7 @@ export const data = [ { userId: '1', originalTimestamp: '2020-09-28T19:53:31.900Z', + context: {}, traits: { firstName: 'John', lastName: 'doe', @@ -336,7 +337,11 @@ export const data = [ status: 200, body: [ { - output: { batch: [{ type: 'alias', previousId: 'name@surname.com', userId: '12345' }] }, + output: { + batch: [ + { type: 'alias', previousId: 'name@surname.com', userId: '12345', context: {} }, + ], + }, }, ], }, diff --git a/test/integrations/sources/segment/data.ts b/test/integrations/sources/segment/data.ts index 8c66abd252..780a65c119 100644 --- a/test/integrations/sources/segment/data.ts +++ b/test/integrations/sources/segment/data.ts @@ -124,6 +124,7 @@ export const data: SrcTestCaseData[] = [ elapsedTime: null, session_id: '**************_***************', }, + context: {}, hostname: '************.us.auth0.com', user_id: 'auth0|************************', user_name: 'example@test.com', @@ -145,6 +146,7 @@ export const data: SrcTestCaseData[] = [ { date: '2020-07-10T07:43:09.620Z', type: 'seacft', + context: {}, description: '', connection_id: '', client_id: '********************************', @@ -176,6 +178,7 @@ export const data: SrcTestCaseData[] = [ client_id: '********************************', client_name: 'My App', ip: '47.15.6.58', + context: {}, user_agent: 'Chrome Mobile 69.0.3497 / Android 0.0.0', details: { prompts: [], @@ -207,6 +210,7 @@ export const data: SrcTestCaseData[] = [ client_id: '********************************', client_name: 'My App', ip: '47.15.6.58', + context: {}, user_agent: 'Chrome Mobile 69.0.3497 / Android 0.0.0', details: { prompts: [],