-
Notifications
You must be signed in to change notification settings - Fork 113
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: webhook v2 * feat: webhook v2 * refactor: getAuthHeaders utility * feat: add xml support * feat: add batching * feat: update batching logic * chore: remove logs * fix: excludeMappedFields utility * test: add testcases * fix: use sha256 for better hashing
- Loading branch information
1 parent
9fb463e
commit e21ebd0
Showing
10 changed files
with
1,171 additions
and
6 deletions.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
bindings: | ||
- name: EventType | ||
path: ../../../../constants | ||
- path: ../../bindings/jsontemplate | ||
exportAll: true | ||
- path: ../../../../v0/destinations/webhook/utils | ||
- name: getHashFromArray | ||
path: ../../../../v0/util | ||
- name: getIntegrationsObj | ||
path: ../../../../v0/util | ||
- name: removeUndefinedAndNullValues | ||
path: ../../../../v0/util | ||
- name: defaultRequestConfig | ||
path: ../../../../v0/util | ||
- name: isEmptyObject | ||
path: ../../../../v0/util | ||
- path: ./utils | ||
|
||
steps: | ||
- name: validateInput | ||
template: | | ||
$.assertConfig(.destination.Config.webhookUrl, "Webhook URL required. Aborting"); | ||
$.assertConfig(!(.destination.Config.auth === "basicAuth" && !(.destination.Config.username)), "Username is required for Basic Authentication. Aborting"); | ||
$.assertConfig(!(.destination.Config.auth === "bearerTokenAuth" && !(.destination.Config.bearerToken)), "Token is required for Bearer Token Authentication. Aborting"); | ||
$.assertConfig(!(.destination.Config.auth === "apiKeyAuth" && !(.destination.Config.apiKeyName)), "API Key Name is required for API Key Authentication. Aborting"); | ||
$.assertConfig(!(.destination.Config.auth === "apiKeyAuth" && !(.destination.Config.apiKeyValue)), "API Key Value is required for API Key Authentication. Aborting"); | ||
- name: deduceMethod | ||
template: | | ||
$.context.method = .destination.Config.method ?? 'POST'; | ||
- name: deduceBodyFormat | ||
template: | | ||
$.context.format = .destination.Config.format ?? 'JSON'; | ||
- name: buildHeaders | ||
template: | | ||
const configAuthHeaders = $.getAuthHeaders(.destination.Config); | ||
const additionalConfigHeaders = $.getCustomMappings(.message, .destination.Config.headers); | ||
$.context.headers = { | ||
...configAuthHeaders, | ||
...additionalConfigHeaders | ||
} | ||
- name: prepareParams | ||
template: | | ||
$.context.params = $.getCustomMappings(.message, .destination.Config.queryParams) | ||
- name: deduceEndPoint | ||
template: | | ||
$.context.endpoint = $.addPathParams(.message, .destination.Config.webhookUrl); | ||
- name: prepareBody | ||
template: | | ||
const payload = $.getCustomMappings(.message, .destination.Config.propertiesMapping); | ||
$.context.payload = $.removeUndefinedAndNullValues($.excludeMappedFields(payload, .destination.Config.propertiesMapping)) | ||
$.context.format === "XML" && !$.isEmptyObject($.context.payload) ? $.context.payload = {payload: $.getXMLPayload($.context.payload)}; | ||
- name: buildResponseForProcessTransformation | ||
template: | | ||
const response = $.defaultRequestConfig(); | ||
$.context.format === "JSON" ? response.body.JSON = $.context.payload: response.body.XML = $.context.payload; | ||
response.endpoint = $.context.endpoint; | ||
response.headers = $.context.headers; | ||
response.method = $.context.method; | ||
response.params = $.context.params ?? {}; | ||
response |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
bindings: | ||
- name: handleRtTfSingleEventError | ||
path: ../../../../v0/util/index | ||
- path: ./utils | ||
exportAll: true | ||
- name: BatchUtils | ||
path: '@rudderstack/workflow-engine' | ||
|
||
steps: | ||
- name: validateInput | ||
template: | | ||
$.assert(Array.isArray(^) && ^.length > 0, "Invalid event array") | ||
- name: transform | ||
externalWorkflow: | ||
path: ./procWorkflow.yaml | ||
loopOverInput: true | ||
|
||
- name: successfulEvents | ||
template: | | ||
$.outputs.transform#idx.output.({ | ||
"batchedRequest": ., | ||
"batched": false, | ||
"destination": ^[idx].destination, | ||
"metadata": ^[idx].metadata[], | ||
"statusCode": 200 | ||
})[] | ||
- name: failedEvents | ||
template: | | ||
$.outputs.transform#idx.error.( | ||
$.handleRtTfSingleEventError(^[idx], .originalError ?? ., {}) | ||
)[] | ||
- name: bodyFormat | ||
template: | | ||
$.outputs.successfulEvents[0].destination.Config.format ?? "JSON"; | ||
- name: batchingEnabled | ||
template: | | ||
$.outputs.successfulEvents[0].destination.Config.isBatchingEnabled; | ||
- name: batchSize | ||
template: | | ||
$.outputs.successfulEvents[0].destination.Config.maxBatchSize; | ||
- name: batchSuccessfulEvents | ||
description: Batches the successfulEvents | ||
condition: $.outputs.batchingEnabled && $.outputs.bodyFormat === "JSON" | ||
template: | | ||
$.batchSuccessfulEvents($.outputs.successfulEvents, $.outputs.batchSize); | ||
- name: finalPayloadWithBatching | ||
condition: $.outputs.batchingEnabled && $.outputs.bodyFormat === "JSON" | ||
template: | | ||
[...$.outputs.batchSuccessfulEvents, ...$.outputs.failedEvents] | ||
else: | ||
name: finalPayloadWithoutBatching | ||
template: | | ||
[...$.outputs.successfulEvents, ...$.outputs.failedEvents] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
const { toXML } = require('jstoxml'); | ||
const { groupBy } = require('lodash'); | ||
const { createHash } = require('crypto'); | ||
const { ConfigurationError } = require('@rudderstack/integrations-lib'); | ||
const { BatchUtils } = require('@rudderstack/workflow-engine'); | ||
const { base64Convertor, applyCustomMappings, isEmptyObject } = require('../../../../v0/util'); | ||
|
||
const getAuthHeaders = (config) => { | ||
let headers; | ||
switch (config.auth) { | ||
case 'basicAuth': { | ||
const credentials = `${config.username}:${config.password}`; | ||
const encodedCredentials = base64Convertor(credentials); | ||
headers = { | ||
Authorization: `Basic ${encodedCredentials}`, | ||
}; | ||
break; | ||
} | ||
case 'bearerTokenAuth': | ||
headers = { Authorization: `Bearer ${config.bearerToken}` }; | ||
break; | ||
case 'apiKeyAuth': | ||
headers = { [config.apiKeyName]: `${config.apiKeyValue}` }; | ||
break; | ||
default: | ||
headers = {}; | ||
} | ||
return headers; | ||
}; | ||
|
||
const getCustomMappings = (message, mapping) => { | ||
try { | ||
return applyCustomMappings(message, mapping); | ||
} catch (e) { | ||
throw new ConfigurationError(`[Webhook]:: Error in custom mappings: ${e.message}`); | ||
} | ||
}; | ||
|
||
// TODO: write a func to evaluate json path template | ||
const addPathParams = (message, webhookUrl) => webhookUrl; | ||
|
||
const excludeMappedFields = (payload, mapping) => { | ||
const rawPayload = { ...payload }; | ||
if (mapping) { | ||
mapping.forEach(({ from, to }) => { | ||
// continue when from === to | ||
if (from === to) return; | ||
|
||
// Remove the '$.' prefix and split the remaining string by '.' | ||
const keys = from.replace(/^\$\./, '').split('.'); | ||
let current = rawPayload; | ||
|
||
// Traverse to the parent of the key to be removed | ||
keys.slice(0, -1).forEach((key) => { | ||
if (current && current[key]) { | ||
current = current[key]; | ||
} else { | ||
current = null; | ||
} | ||
}); | ||
|
||
if (current) { | ||
// Remove the 'from' field from input payload | ||
delete current[keys[keys.length - 1]]; | ||
} | ||
}); | ||
} | ||
|
||
return rawPayload; | ||
}; | ||
|
||
const getXMLPayload = (payload) => | ||
toXML(payload, { | ||
header: true, | ||
}); | ||
|
||
const getMergedEvents = (batch) => { | ||
const events = []; | ||
batch.forEach((event) => { | ||
if (!isEmptyObject(event.batchedRequest.body.JSON)) { | ||
events.push(event.batchedRequest.body.JSON); | ||
} | ||
}); | ||
return events; | ||
}; | ||
|
||
const mergeMetadata = (batch) => batch.map((event) => event.metadata[0]); | ||
|
||
const createHashKey = (endpoint, headers, params) => { | ||
const hash = createHash('sha256'); | ||
hash.update(endpoint); | ||
hash.update(JSON.stringify(headers)); | ||
hash.update(JSON.stringify(params)); | ||
return hash.digest('hex'); | ||
}; | ||
|
||
const buildBatchedRequest = (batch) => ({ | ||
batchedRequest: { | ||
body: { | ||
JSON: {}, | ||
JSON_ARRAY: { batch: JSON.stringify(getMergedEvents(batch)) }, | ||
XML: {}, | ||
FORM: {}, | ||
}, | ||
version: '1', | ||
type: 'REST', | ||
method: batch[0].batchedRequest.method, | ||
endpoint: batch[0].batchedRequest.endpoint, | ||
headers: batch[0].batchedRequest.headers, | ||
params: batch[0].batchedRequest.params, | ||
files: {}, | ||
}, | ||
metadata: mergeMetadata(batch), | ||
batched: true, | ||
statusCode: 200, | ||
destination: batch[0].destination, | ||
}); | ||
|
||
const batchSuccessfulEvents = (events, batchSize) => { | ||
const response = []; | ||
// group events by endpoint, headers and query params | ||
const groupedEvents = groupBy(events, (event) => { | ||
const { endpoint, headers, params } = event.batchedRequest; | ||
return createHashKey(endpoint, headers, params); | ||
}); | ||
|
||
// batch the each grouped event | ||
Object.keys(groupedEvents).forEach((groupKey) => { | ||
const batches = BatchUtils.chunkArrayBySizeAndLength(groupedEvents[groupKey], { | ||
maxItems: batchSize, | ||
}).items; | ||
batches.forEach((batch) => { | ||
response.push(buildBatchedRequest(batch)); | ||
}); | ||
}); | ||
return response; | ||
}; | ||
|
||
module.exports = { | ||
getAuthHeaders, | ||
getCustomMappings, | ||
addPathParams, | ||
excludeMappedFields, | ||
getXMLPayload, | ||
batchSuccessfulEvents, | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.