Skip to content

Commit

Permalink
feat: add support to add custom network policies for specific workspa…
Browse files Browse the repository at this point in the history
…ces in faas pods
  • Loading branch information
Jayachand authored and psrikanth88 committed Nov 2, 2023
1 parent c147906 commit bc1a760
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 14 deletions.
16 changes: 12 additions & 4 deletions src/util/customTransformer-faas.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { v4: uuidv4 } = require('uuid');
const crypto = require('crypto');
const NodeCache = require('node-cache');
const { getMetadata } = require('../v0/util');
const { getMetadata, getTransformationMetadata } = require('../v0/util');
const stats = require('./stats');
const {
setupFaasFunction,
Expand Down Expand Up @@ -82,10 +82,10 @@ async function setOpenFaasUserTransform(
libraryVersionIds,
pregeneratedFnName,
testMode = false,
trMetadata = {},
) {
const tags = {
transformerVersionId: userTransformation.versionId,
language: userTransformation.language,
identifier: 'openfaas',
testMode,
};
Expand All @@ -106,6 +106,7 @@ async function setOpenFaasUserTransform(
testMode,
),
testMode,
trMetadata,
);

stats.timing('creation_time', setupTime, tags);
Expand All @@ -129,16 +130,22 @@ async function runOpenFaasUserTransform(
const metaTags = events[0].metadata ? getMetadata(events[0].metadata) : {};
const tags = {
transformerVersionId: userTransformation.versionId,
language: userTransformation.language,
identifier: 'openfaas',
testMode,
...metaTags,
};
const trMetadata = events[0].metadata ? getTransformationMetadata(events[0].metadata) : {};

// check and deploy faas function if not exists
const functionName = generateFunctionName(userTransformation, libraryVersionIds, testMode);
if (testMode) {
await setOpenFaasUserTransform(userTransformation, libraryVersionIds, functionName, testMode);
await setOpenFaasUserTransform(
userTransformation,
libraryVersionIds,
functionName,
testMode,
trMetadata,
);
}

const invokeTime = new Date();
Expand All @@ -156,6 +163,7 @@ async function runOpenFaasUserTransform(
testMode,
),
testMode,
trMetadata,
);
stats.timing('run_time', invokeTime, tags);
return result;
Expand Down
62 changes: 52 additions & 10 deletions src/util/openfaas/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const CONFIG_BACKEND_URL = process.env.CONFIG_BACKEND_URL || 'https://api.rudder
const GEOLOCATION_URL = process.env.GEOLOCATION_URL || '';
const FAAS_AST_VID = 'ast';
const FAAS_AST_FN_NAME = 'fn-ast';
const CUSTOM_NETWORK_POLICY_WORKSPACE_IDS = process.env.CUSTOM_NETWORK_POLICY_WORKSPACE_IDS || '';
const customNetworkPolicyWorkspaceIds = CUSTOM_NETWORK_POLICY_WORKSPACE_IDS.split(',');

// Initialise node cache
const functionListCache = new NodeCache();
Expand Down Expand Up @@ -111,7 +113,14 @@ const invalidateFnCache = () => {
functionListCache.set(FUNC_LIST_KEY, []);
};

const deployFaasFunction = async (functionName, code, versionId, libraryVersionIDs, testMode) => {
const deployFaasFunction = async (
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata = {},
) => {
try {
logger.debug('[Faas] Deploying a faas function');
let envProcess = 'python index.py';
Expand All @@ -132,19 +141,30 @@ const deployFaasFunction = async (functionName, code, versionId, libraryVersionI
if (GEOLOCATION_URL) {
envVars.geolocation_url = GEOLOCATION_URL;
}
// labels
const labels = {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
transformationId: trMetadata.transformationId,
workspaceId: trMetadata.workspaceId,
};
if (
trMetadata.workspaceId &&
customNetworkPolicyWorkspaceIds.includes(trMetadata.workspaceId)
) {
labels['custom-network-policy'] = 'true';
}

// TODO: investigate and add more required labels and annotations
const payload = {
service: functionName,
name: functionName,
image: FAAS_BASE_IMG,
envProcess,
envVars,
labels: {
'openfaas-fn': 'true',
'parent-component': 'openfaas',
'com.openfaas.scale.max': FAAS_MAX_PODS_IN_TEXT,
'com.openfaas.scale.min': FAAS_MIN_PODS_IN_TEXT,
},
labels,
annotations: {
'prometheus.io.scrape': 'true',
},
Expand Down Expand Up @@ -175,14 +195,28 @@ const deployFaasFunction = async (functionName, code, versionId, libraryVersionI
}
};

async function setupFaasFunction(functionName, code, versionId, libraryVersionIDs, testMode) {
async function setupFaasFunction(
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata = {},
) {
try {
if (!testMode && isFunctionDeployed(functionName)) {
logger.debug(`[Faas] Function ${functionName} already deployed`);
return;
}
// deploy faas function
await deployFaasFunction(functionName, code, versionId, libraryVersionIDs, testMode);
await deployFaasFunction(
functionName,
code,
versionId,
libraryVersionIDs,
testMode,
trMetadata,
);

// This api call is only used to check if function is spinned correctly
await awaitFunctionReadiness(functionName);
Expand All @@ -201,6 +235,7 @@ const executeFaasFunction = async (
versionId,
libraryVersionIDs,
testMode,
trMetadata = {},
) => {
try {
logger.debug('[Faas] Invoking faas function');
Expand All @@ -217,7 +252,14 @@ const executeFaasFunction = async (
error.message.includes(`error finding function ${functionName}`)
) {
removeFunctionFromCache(functionName);
await setupFaasFunction(functionName, null, versionId, libraryVersionIDs, testMode);
await setupFaasFunction(
functionName,
null,
versionId,
libraryVersionIDs,
testMode,
trMetadata,
);
throw new RetryRequestError(`${functionName} not found`);
}

Expand Down
7 changes: 7 additions & 0 deletions src/v0/util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,12 @@ const getMetadata = (metadata) => ({
destinationType: metadata.destinationType,
k8_namespace: metadata.namespace,
});

const getTransformationMetadata = (metadata) => ({
transformationId: metadata.transformationId,
workspaceId: metadata.workspaceId,
});

// checks if array 2 is a subset of array 1
function checkSubsetOfArray(array1, array2) {
const result = array2.every((val) => array1.includes(val));
Expand Down Expand Up @@ -2113,6 +2119,7 @@ module.exports = {
getIntegrationsObj,
getMappingConfig,
getMetadata,
getTransformationMetadata,
getParsedIP,
getStringValueOfJSON,
getSuccessRespEvents,
Expand Down

0 comments on commit bc1a760

Please sign in to comment.