diff --git a/services/bpmn-service/.vscode/settings.json b/services/bpmn-service/.vscode/settings.json index 07313667ec..4d57e61980 100644 --- a/services/bpmn-service/.vscode/settings.json +++ b/services/bpmn-service/.vscode/settings.json @@ -5,8 +5,8 @@ "editor.trimAutoWhitespace": true, "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.organizeImports": true, - "source.fixAll.eslint": true + "source.organizeImports": "explicit", + "source.fixAll.eslint": "explicit" }, "files.exclude": { diff --git a/services/bpmn-service/README.md b/services/bpmn-service/README.md index 3e47023473..eac299a95b 100644 --- a/services/bpmn-service/README.md +++ b/services/bpmn-service/README.md @@ -48,11 +48,23 @@ npm i @sourceloop/bpmn-service }); ``` - Implement `WorkflowProvider` (refer [this](#bpmnprovider)) and bind it to `WorkflowServiceBindings.WorkflowManager` key - + ```typescript this.bind(WorkflowServiceBindings.WorkflowManager).toProvider( WorkflowProvider, ); ``` + + OR + + Use an existing workflow provider, like the Camunda 7 provided with this library - + + ```typescript + this.component(CamundaComponent); + ``` + + You also need to install the [camunda-external-task-client-js](https://www.npmjs.com/package/camunda-external-task-client-js) library to use this component. + - Add the `WorkflowServiceComponent` to your Loopback4 Application (in `application.ts`). ```typescript // import WorkflowServiceComponent diff --git a/services/bpmn-service/package.json b/services/bpmn-service/package.json index cbdd96d9b2..51ff73bc5f 100644 --- a/services/bpmn-service/package.json +++ b/services/bpmn-service/package.json @@ -13,12 +13,19 @@ "./sequelize": { "types": "./dist/repositories/sequelize/index.d.ts", "default": "./dist/repositories/sequelize/index.js" + }, + "./camunda": { + "types": "./dist/connectors/camunda/index.d.ts", + "default": "./dist/connectors/camunda/index.js" } }, "typesVersions": { "*": { "sequelize": [ "./dist/repositories/sequelize/index.d.ts" + ], + "camunda": [ + "./dist/connectors/camunda/index.d.ts" ] } }, @@ -75,7 +82,6 @@ "@loopback/rest": "^14.0.2", "@sourceloop/core": "^15.0.0", "ajv": "^8.11.0", - "camunda-external-task-client-js": "^2.3.1", "dotenv": "^16.0.3", "dotenv-extended": "^2.9.0", "jsonwebtoken": "^9.0.0", @@ -90,6 +96,7 @@ "@loopback/build": "^11.0.2", "@loopback/eslint-config": "^15.0.2", "@loopback/testlab": "^7.0.2", + "camunda-external-task-client-js": "^2.3.1", "@types/camunda-external-task-client-js": "^1.3.3", "@types/jsonwebtoken": "^9.0.0", "@types/lodash": "^4.14.182", @@ -102,7 +109,8 @@ "widdershins": "^4.0.1" }, "optionalDependencies": { - "@loopback/sequelize": "^0.6.2" + "@loopback/sequelize": "^0.6.2", + "camunda-external-task-client-js": "^2.3.1" }, "overrides": { "widdershins": { diff --git a/services/bpmn-service/src/__tests__/acceptance/camunda.acceptance.ts b/services/bpmn-service/src/__tests__/acceptance/camunda.acceptance.ts new file mode 100644 index 0000000000..3b7f73e8c1 --- /dev/null +++ b/services/bpmn-service/src/__tests__/acceptance/camunda.acceptance.ts @@ -0,0 +1,340 @@ +// Copyright (c) 2023 Sourcefuse Technologies +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT +import {AnyObject} from '@loopback/repository'; +import {Client, expect} from '@loopback/testlab'; +import {configDotenv} from 'dotenv'; +import * as jwt from 'jsonwebtoken'; +import {WorkflowServiceApplication} from '../../application'; +import {CamundaConnectorService} from '../../connectors/camunda'; +import {Workflow, WorkflowVersion} from '../../models'; +import { + WorkflowRepository, + WorkflowVersionRepository, +} from '../../repositories'; +import {firstTestBpmnInput, generateBpmn} from '../const'; +import {setUpApplication} from './helper'; +configDotenv(); +describe('Workflow Controller: With Camunda', () => { + let app: WorkflowServiceApplication; + let client: Client; + let workflowRepo: WorkflowRepository; + let workflowVersionRepo: WorkflowVersionRepository; + let camundaService: CamundaConnectorService; + const basePath = '/workflows'; + const pass = 'test_password'; + const testUser = { + id: 1, + username: 'test_user', + password: pass, + permissions: [ + 'ViewWorkflow', + 'CreateWorkflow', + 'UpdateWorkflow', + 'DeleteWorkflow', + 'ExecuteWorkflow', + ], + }; + + const token = jwt.sign(testUser, 'kdskssdkdfs', { + expiresIn: 180000, + issuer: 'sf', + }); + + before('setupApplication', async function () { + if (!process.env.WORKFLOW_ENGINE_BASE_URL) { + // eslint-disable-next-line @typescript-eslint/no-invalid-this + this.skip(); + } + ({app, client} = await setUpApplication(false)); + }); + before(givenRepositories); + + after(async () => app?.stop()); + + afterEach(deleteMockData); + + describe('/workflows GET', () => { + it('gives status 401 when no token is passed', async () => { + const response = await client.get(basePath).expect(401); + expect(response).to.have.property('error'); + }); + + it('gives status 200 when token is passed, with no existing workflow', async () => { + const response = await client + .get(basePath) + .set('authorization', `Bearer ${token}`) + .expect(200); + + expect(response.body).to.have.length(0); + }); + + it('gives status 200 when token is passed, with an existing workflow', async () => { + const workflow = generateBpmn(); + await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + const response = await client + .get(basePath) + .set('authorization', `Bearer ${token}`) + .expect(200); + + expect(response.body).to.have.length(1); + expect(response.body[0]).to.have.property('name').equal(workflow.name); + }); + }); + + describe('/workflows POST', () => { + it('gives status 200 when creating a workflow using correct payload', async () => { + const workflow = generateBpmn(); + + await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`) + .expect(200); + }); + }); + + describe('/workflow/:id PATCH', () => { + it('gives status 404 when token is passed, with non-existant workflow id', async () => { + const workflow = generateBpmn(); + await client + .patch(`${basePath}/0`) + .send(workflow) + .set('authorization', `Bearer ${token}`) + .expect(404); + }); + + it('gives status 204 when token is passed, with an existing workflow id, and workflow is updated', async () => { + const workflow = generateBpmn(); + + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + await client + .patch(`${basePath}/${saved.body.id}`) + .send({ + ...workflow, + bpmnFile: workflow.bpmnFile.replace('topic1', 'topic4'), + }) + .set('authorization', `Bearer ${token}`) + .expect(204); + + const {body: workflows} = await client + .get( + `${basePath}?filter=${JSON.stringify({include: ['workflowVersions']})}`, + ) + .set('authorization', `Bearer ${token}`) + .expect(200); + + expect(workflows).to.have.length(1); + expect(workflows[0].workflowVersions.length).to.equal(2); + const maxVersion = workflows[0].workflowVersions.reduce( + (prev: WorkflowVersion, current: WorkflowVersion) => + prev.version > current.version ? prev : current, + ); + + expect(workflows[0].externalIdentifier).to.equal( + maxVersion.externalWorkflowId, + ); + }); + }); + + describe('/workflows/:id DELETE', () => { + it('gives 204 when token is passed, with an existing workflow id', async () => { + const workflow = generateBpmn(); + + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + const deploymentId = await camundaService + .get(saved.body.externalIdentifier) + .then(res => res.deploymentId); + await client + .delete(`${basePath}/${saved.body.id}`) + .set('authorization', `Bearer ${token}`) + .expect(204); + + // due to a bug in current implementation where all the definitions are deleted + // but we have to delete the deployment manually + await camundaService.deleteDeployment(deploymentId, true); + }); + + it('gives 404 when token is passed, with a non-existant workflow id', async () => { + await client + .delete(`${basePath}/0`) + .set('authorization', `Bearer ${token}`) + .expect(404); + }); + }); + + describe('/workflows/:id/version/:version DELETE', () => { + it('gives 204 when token is passed, with an existing non-latest workflow version id', async () => { + const workflow = generateBpmn(); + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + await client + .patch(`${basePath}/${saved.body.id}`) + .send({ + ...workflow, + bpmnFile: workflow.bpmnFile.replace('topic1', 'topic4'), + }) + .set('authorization', `Bearer ${token}`); + const deploymentId = await camundaService + .get(saved.body.externalIdentifier) + .then(res => res.deploymentId); + await client + .delete( + `${basePath}/${saved.body.id}/version/${saved.body.workflowVersion}`, + ) + .set('authorization', `Bearer ${token}`) + .expect(204); + + // due to a bug in current implementation where all the definitions are deleted + // but we have to delete the deployment manually + await camundaService.deleteDeployment(deploymentId, true); + }); + + it('gives 404 when token is passed, with a non-existant workflow version id', async () => { + const workflow = generateBpmn(); + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + await client + .delete( + `${basePath}/${saved.body.id}/version/${ + saved.body.workflowVersion + 1 + }`, + ) + .set('authorization', `Bearer ${token}`) + .expect(404); + }); + }); + + describe('/workflow/:id/execute POST', () => { + it('gives 400 when invalid input is passed', async () => { + const workflow = generateBpmn(); + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + const input = { + ...firstTestBpmnInput, + valueB: 123, + }; + + await client + .post(`${basePath}/${saved.body.id}/execute`) + .send({input}) + .set('authorization', `Bearer ${token}`) + .expect(400); + }); + + it('gives 200 when valid input is passed, and workflow is completed in the engine', async () => { + const workflow = generateBpmn(); + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + await client + .post(`${basePath}/${saved.body.id}/execute`) + .send({input: firstTestBpmnInput}) + .set('authorization', `Bearer ${token}`) + .expect(200); + }); + + it('gives 404 when invalid workflow id is passed', async () => { + await client + .post(`${basePath}/0/execute`) + .send({input: firstTestBpmnInput}) + .set('authorization', `Bearer ${token}`) + .expect(404); + }); + + it('gives 404 when invalid workflow version is passed', async () => { + const workflow = generateBpmn(); + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + await client + .post(`${basePath}/${saved.body.id}/execute`) + .send({ + input: firstTestBpmnInput, + workflowVersion: saved.body.workflowVersion + 1, + }) + .set('authorization', `Bearer ${token}`) + .expect(404); + }); + + it('gives 200 when valid workflow version and input is passed, and workflow is completed in the engine', async () => { + const workflow = generateBpmn(); + const saved = await client + .post(basePath) + .send(workflow) + .set('authorization', `Bearer ${token}`); + + await client + .post(`${basePath}/${saved.body.id}/execute`) + .send({ + input: firstTestBpmnInput, + workflowVersion: saved.body.workflowVersion, + }) + .set('authorization', `Bearer ${token}`) + .expect(200); + }); + }); + + async function deleteMockData() { + await clearData(); + await workflowRepo.deleteAllHard(); + await workflowVersionRepo.deleteAll(); + } + + async function givenRepositories() { + workflowRepo = await app.getRepository(WorkflowRepository); + workflowVersionRepo = await app.getRepository(WorkflowVersionRepository); + camundaService = await app.get('services.CamundaConnectorService'); + } + + async function clearData(workflows?: Workflow[]) { + if (workflowRepo) { + workflows = + workflows ?? + (await workflowRepo.find({ + include: ['workflowVersions'], + })); + const deployments: string[] = await Promise.all( + workflows + .map(workflow => { + return workflow.workflowVersions.map(version => + camundaService.get(version.externalWorkflowId), + ); + }) + .flat() + .map(p => p.then(res => res.deploymentId)), + ); + const ids = new Set(deployments); + for (const id of ids) { + await camundaService.deleteDeployment(id, true).catch(() => { + console.error(`Error deleting deployment ${id}`); //NOSONAR + }); + } + } + } +}); diff --git a/services/bpmn-service/src/__tests__/acceptance/helper.ts b/services/bpmn-service/src/__tests__/acceptance/helper.ts index 54004948dc..dc8c027800 100644 --- a/services/bpmn-service/src/__tests__/acceptance/helper.ts +++ b/services/bpmn-service/src/__tests__/acceptance/helper.ts @@ -2,26 +2,27 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT +import {BindingScope} from '@loopback/context'; import { Client, createRestAppClient, givenHttpServerConfig, } from '@loopback/testlab'; -import {BearerTokenVerifyProvider} from '../provider/bearer-token-verify.provider'; import {Strategies} from 'loopback4-authentication'; import {WorkflowServiceApplication} from '../../application'; +import {CamundaComponent} from '../../connectors/camunda'; +import {WorkflowServiceBindings} from '../../keys'; import {BPMTask, WorkflowCacheSourceName} from '../../types'; +import {TestBpmnCommand} from '../commands/test.command'; +import {MOCK_CAMUNDA, firstTestBpmn} from '../const'; import {WorkflowDbDatasource} from '../datasources/workflowdb.datasource'; -import {WorkflowServiceBindings} from '../../keys'; -import {WorkflowMockProvider} from '../provider/workflow-helper-mock.provider'; -import {firstTestBpmn, MOCK_CAMUNDA} from '../const'; -import {MOCK_BPMN_ENGINE_KEY} from '../types'; import {MockEngine} from '../mock-engine'; -import {BindingScope} from '@loopback/context'; +import {BearerTokenVerifyProvider} from '../provider/bearer-token-verify.provider'; +import {WorkflowMockProvider} from '../provider/workflow-helper-mock.provider'; import {WorkerMockImplementationProvider} from '../provider/workflow-mock-implementation.provider'; -import {TestBpmnCommand} from '../commands/test.command'; +import {MOCK_BPMN_ENGINE_KEY} from '../types'; -export async function setUpApplication(): Promise { +export async function setUpApplication(mock = true): Promise { const app = new WorkflowServiceApplication({ rest: givenHttpServerConfig(), }); @@ -34,20 +35,29 @@ export async function setUpApplication(): Promise { app .bind(Strategies.Passport.BEARER_TOKEN_VERIFIER) .toProvider(BearerTokenVerifyProvider); - app - .bind(WorkflowServiceBindings.WorkflowManager) - .toProvider(WorkflowMockProvider); - app - .bind(WorkflowServiceBindings.WorkerImplementationFunction) - .toProvider(WorkerMockImplementationProvider); - app.bind(WorkflowServiceBindings.Config).to({ - workflowEngineBaseUrl: MOCK_CAMUNDA, - useCustomSequence: false, - }); - app - .bind(MOCK_BPMN_ENGINE_KEY) - .toClass(MockEngine) - .inScope(BindingScope.SINGLETON); + if (mock) { + app + .bind(WorkflowServiceBindings.WorkflowManager) + .toProvider(WorkflowMockProvider); + app + .bind(WorkflowServiceBindings.WorkerImplementationFunction) + .toProvider(WorkerMockImplementationProvider); + app.bind(WorkflowServiceBindings.Config).to({ + workflowEngineBaseUrl: MOCK_CAMUNDA, + useCustomSequence: false, + }); + app + .bind(MOCK_BPMN_ENGINE_KEY) + .toClass(MockEngine) + .inScope(BindingScope.SINGLETON); + } else { + app.component(CamundaComponent); + app.bind(WorkflowServiceBindings.Config).to({ + workflowEngineBaseUrl: process.env.WORKFLOW_ENGINE_BASE_URL, + useCustomSequence: false, + }); + } + const registerFn = await app.getValueOrPromise( WorkflowServiceBindings.RegisterWorkerFunction, ); diff --git a/services/bpmn-service/src/__tests__/const.ts b/services/bpmn-service/src/__tests__/const.ts index 651bfd334c..4761eb23e7 100644 --- a/services/bpmn-service/src/__tests__/const.ts +++ b/services/bpmn-service/src/__tests__/const.ts @@ -21,3 +21,80 @@ export const firstTestBpmnInput = { valueA: 'string', valueB: 'string', }; + +export const MOCK_WORKFLOW = ` + + + + + + + + + Flow_0g3ljqf + + + + + Flow_1le5efs + + + + Flow_0g3ljqf + Flow_1i983ig + + + Flow_1i983ig + Flow_1le5efs + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +`; + +export const firstTestRealBpmn = new WorkflowDto({ + ...firstTestBpmn, + bpmnFile: MOCK_WORKFLOW, +}); + +export function generateBpmn(name?: string, processId?: string) { + processId = processId ?? Math.floor(Math.random() * 1000000).toString(); //NOSONAR + return { + ...firstTestBpmn, + name: name ?? `process-${processId}`, + bpmnFile: MOCK_WORKFLOW.replace('test-workflow', `process-${processId}`), + }; +} diff --git a/services/bpmn-service/src/__tests__/mock-engine.ts b/services/bpmn-service/src/__tests__/mock-engine.ts index a2e5cdfb7f..f9441456cc 100644 --- a/services/bpmn-service/src/__tests__/mock-engine.ts +++ b/services/bpmn-service/src/__tests__/mock-engine.ts @@ -57,7 +57,10 @@ export class MockEngine { return this.workflowList[workflow.name][version]; } - update(workflow: WorkflowDto) { + update(workflow: Partial) { + if (!workflow.name) { + throw new HttpErrors.NotFound('Not Found'); + } if (this.workflowList?.[workflow.name]) { const latest = Math.max( ...Object.keys(this.workflowList[workflow.name]).map(s => Number(s)), @@ -67,9 +70,9 @@ export class MockEngine { externalIdentifier: workflow.name, name: workflow.name, provider: 'bpmn', - inputSchema: workflow.inputSchema, + inputSchema: workflow.inputSchema ?? {}, workflowVersions: [], - file: workflow.bpmnFile, + file: workflow.bpmnFile ?? '', }; return this.workflowList[workflow.name][latest + 1]; } else { diff --git a/services/bpmn-service/src/__tests__/provider/workflow-helper-mock.provider.ts b/services/bpmn-service/src/__tests__/provider/workflow-helper-mock.provider.ts index 7cc8a80109..1721d04b99 100644 --- a/services/bpmn-service/src/__tests__/provider/workflow-helper-mock.provider.ts +++ b/services/bpmn-service/src/__tests__/provider/workflow-helper-mock.provider.ts @@ -2,11 +2,11 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT -import {bind, inject, BindingScope, Provider, Getter} from '@loopback/core'; +import {BindingScope, Getter, Provider, bind, inject} from '@loopback/core'; import {Workflow} from '../../models'; import {WorflowManager} from '../../types'; import {MockEngine} from '../mock-engine'; -import {MockCamundaWorkflow, MOCK_BPMN_ENGINE_KEY} from '../types'; +import {MOCK_BPMN_ENGINE_KEY, MockCamundaWorkflow} from '../types'; @bind({scope: BindingScope.TRANSIENT}) export class WorkflowMockProvider implements Provider { @@ -51,8 +51,8 @@ export class WorkflowMockProvider implements Provider { return { version: mockWorkflow.workflowVersion, provider: 'bpmn', - processId: workflow.name, - externalId: workflow.name, + processId: workflow.name ?? '', + externalId: workflow.name ?? '', }; }, deleteWorkflowById: async workflow => { diff --git a/services/bpmn-service/src/__tests__/provider/workflow-mock-implementation.provider.ts b/services/bpmn-service/src/__tests__/provider/workflow-mock-implementation.provider.ts index 5bd1e74763..0ce55ee045 100644 --- a/services/bpmn-service/src/__tests__/provider/workflow-mock-implementation.provider.ts +++ b/services/bpmn-service/src/__tests__/provider/workflow-mock-implementation.provider.ts @@ -4,10 +4,10 @@ // https://opensource.org/licenses/MIT import {inject, Provider} from '@loopback/context'; import {ILogger, LOGGER} from '@sourceloop/core'; -import {IWorkflowServiceConfig, WorkerImplementationFn} from '../../types'; import {WorkflowServiceBindings} from '../../keys'; -import {MOCK_BPMN_ENGINE_KEY} from '../types'; +import {IWorkflowServiceConfig, WorkerImplementationFn} from '../../types'; import {MockEngine} from '../mock-engine'; +import {MOCK_BPMN_ENGINE_KEY} from '../types'; export class WorkerMockImplementationProvider implements Provider diff --git a/services/bpmn-service/src/component.ts b/services/bpmn-service/src/component.ts index cfe53a217b..878e739d12 100644 --- a/services/bpmn-service/src/component.ts +++ b/services/bpmn-service/src/component.ts @@ -7,8 +7,10 @@ import { Component, ControllerClass, CoreBindings, + createBindingFromClass, inject, ProviderMap, + ServiceOrProviderClass, } from '@loopback/core'; import {Class, Model, Repository} from '@loopback/repository'; import {RestApplication} from '@loopback/rest'; @@ -31,15 +33,14 @@ import { import {WorkflowController} from './controllers'; import {WorkflowServiceBindings} from './keys'; import {Workflow} from './models'; -import {WorkflowProvider} from './providers'; import {ExecutionInputValidationProvider} from './providers/execution-input-validator.provider'; import {WorkerRegisterFnProvider} from './providers/register-worker.service'; -import {WorkerImplementationProvider} from './providers/worker-implementation.provider'; import {WorkflowRepository, WorkflowVersionRepository} from './repositories'; import { WorkflowRepository as WorkflowSequelizeRepository, WorkflowVersionRepository as WorkflowVersionSequelizeRepository, } from './repositories/sequelize'; +import {HttpClientService, WorkflowService} from './services'; import {IWorkflowServiceConfig} from './types'; export class WorkflowServiceComponent implements Component { @@ -49,7 +50,11 @@ export class WorkflowServiceComponent implements Component { @inject(WorkflowServiceBindings.Config, {optional: true}) private readonly workflowSvcConfig?: IWorkflowServiceConfig, ) { - this.bindings = []; + this.bindings = [ + createBindingFromClass(WorkflowService, { + key: WorkflowServiceBindings.WorkflowService, + }), + ]; this.providers = {}; // Mount core component @@ -88,13 +93,10 @@ export class WorkflowServiceComponent implements Component { this.models = [Workflow]; this.providers = { - [WorkflowServiceBindings.WorkflowManager.key]: WorkflowProvider, [WorkflowServiceBindings.ExecutionInputValidatorFn.key]: ExecutionInputValidationProvider, [WorkflowServiceBindings.RegisterWorkerFunction.key]: WorkerRegisterFnProvider, - [WorkflowServiceBindings.WorkerImplementationFunction.key]: - WorkerImplementationProvider, }; this.application @@ -103,13 +105,17 @@ export class WorkflowServiceComponent implements Component { return {}; }); + this.application.bind(WorkflowServiceBindings.COMMANDS).to([]); this.controllers = [WorkflowController]; + this.services = [HttpClientService]; } providers?: ProviderMap = {}; bindings?: Binding[] = []; + services?: ServiceOrProviderClass[]; + /** * An optional list of Repository classes to bind for dependency injection * via `app.repository()` API. diff --git a/services/bpmn-service/src/connectors/camunda/camunda.component.ts b/services/bpmn-service/src/connectors/camunda/camunda.component.ts new file mode 100644 index 0000000000..0cc11dc4fd --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/camunda.component.ts @@ -0,0 +1,117 @@ +// Copyright (c) 2023 Sourcefuse Technologies +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT +import { + Binding, + Component, + ControllerClass, + CoreBindings, + inject, + ProviderMap, + ServiceOrProviderClass, +} from '@loopback/core'; +import {Class, Model, Repository} from '@loopback/repository'; +import {RestApplication} from '@loopback/rest'; +import { + BearerVerifierBindings, + BearerVerifierComponent, + BearerVerifierConfig, + BearerVerifierType, + SECURITY_SCHEME_SPEC, + ServiceSequence, +} from '@sourceloop/core'; +import {AuthenticationComponent} from 'loopback4-authentication'; +import { + AuthorizationBindings, + AuthorizationComponent, +} from 'loopback4-authorization'; +import {WorkflowServiceBindings} from '../../keys'; +import {CamundaConnectorService} from './connector.service'; +import {CamundaImplementationProvider} from './implementation.provider'; +import {CamundaManagerProvider} from './manager.provider'; +import {TaskObserver} from './task.observer'; + +export class CamundaComponent implements Component { + constructor( + @inject(CoreBindings.APPLICATION_INSTANCE) + private readonly application: RestApplication, + ) { + this.providers = {}; + + this.application.api({ + openapi: '3.0.0', + info: { + title: 'Bpmn Service', + version: '1.0.0', + }, + paths: {}, + components: { + securitySchemes: SECURITY_SCHEME_SPEC, + }, + servers: [{url: '/'}], + }); + + this.providers = { + [WorkflowServiceBindings.WorkflowManager.key]: CamundaManagerProvider, + [WorkflowServiceBindings.WorkerImplementationFunction.key]: + CamundaImplementationProvider, + }; + + this.services = [CamundaConnectorService]; + + this.application + .bind(WorkflowServiceBindings.WORKER_MAP) + .toDynamicValue(() => { + return {}; + }); + + application.lifeCycleObserver(TaskObserver); + } + + providers?: ProviderMap = {}; + + bindings?: Binding[] = []; + + services?: ServiceOrProviderClass[]; + + /** + * An optional list of Repository classes to bind for dependency injection + * via `app.repository()` API. + */ + repositories?: Class>[]; + + /** + * An optional list of Model classes to bind for dependency injection + * via `app.model()` API. + */ + models?: Class[]; + + /** + * An array of controller classes + */ + controllers?: ControllerClass[]; + + /** + * Setup ServiceSequence by default if no other sequnce provided + * + */ + setupSequence() { + this.application.sequence(ServiceSequence); + + // Mount authentication component for default sequence + this.application.component(AuthenticationComponent); + // Mount bearer verifier component + this.application.bind(BearerVerifierBindings.Config).to({ + authServiceUrl: '', + type: BearerVerifierType.service, + } as BearerVerifierConfig); + this.application.component(BearerVerifierComponent); + + // Mount authorization component for default sequence + this.application.bind(AuthorizationBindings.CONFIG).to({ + allowAlwaysPaths: ['/explorer'], + }); + this.application.component(AuthorizationComponent); + } +} diff --git a/services/bpmn-service/src/connectors/camunda/connector.service.ts b/services/bpmn-service/src/connectors/camunda/connector.service.ts new file mode 100644 index 0000000000..b5ef6f53d1 --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/connector.service.ts @@ -0,0 +1,117 @@ +// Copyright (c) 2023 Sourcefuse Technologies +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT +import {BindingScope, inject, injectable, service} from '@loopback/core'; +import {AnyObject} from '@loopback/repository'; +import FormData from 'form-data'; +import {WorkflowServiceBindings} from '../../keys'; +import {HttpClientService} from '../../services'; +import {IWorkflowServiceConfig} from '../../types'; +import {CamundaTask} from './types'; + +@injectable({scope: BindingScope.TRANSIENT}) +export class CamundaConnectorService { + baseUrl: string | undefined = ''; + constructor( + @service(HttpClientService) + private readonly http: HttpClientService, + @inject(WorkflowServiceBindings.Config, {optional: true}) + private readonly config: IWorkflowServiceConfig, + ) { + this.baseUrl = config?.workflowEngineBaseUrl; + } + + async completeUserTask(id: string, variables?: AnyObject) { + return this.http.post(`${this.baseUrl}/task/${id}/complete`, { + variables, + }); + } + + async getPendingUserTasks(processDefinitionId: string) { + return this.http.get( + `${this.baseUrl}/task?processInstanceId=${processDefinitionId}`, + ); + } + + async deleteProcessInstances(ids: (string | undefined)[]) { + if (ids) { + return Promise.all( + ids.map(id => + this.http.delete(`${this.baseUrl}/process-instance/${id}`, { + query: { + cascade: true, + skipCustomListeners: true, + }, + }), + ), + ); + } + } + + async create(name: string, file: Buffer) { + const form = new FormData(); + form.append(`${name}.bpmn`, file.toString('utf-8'), { + filename: `${name}.bpmn`, + }); + form.append('deployment-name', name); + form.append('deploy-changed-only', String(true)); + return this.http.postFormData(`${this.baseUrl}/deployment/create`, form); + } + + async delete(ids: string[]) { + return Promise.all( + ids.map(id => + this.http.delete(`${this.baseUrl}/process-definition/${id}`, { + query: { + cascade: true, + }, + }), + ), + ); + } + + async deleteDeployment(id: string, cascade = false) { + return this.http.delete( + `${this.baseUrl}/deployment/${id}?cascade=${cascade}`, + ); + } + + async deleteVersion(id: string) { + return this.http.delete(`${this.baseUrl}/process-definition/${id}`); + } + + async get(id: string) { + return this.http.get(`${this.baseUrl}/process-definition/${id}`); + } + + async execute(id: string, input: AnyObject) { + return this.http.post(`${this.baseUrl}/process-definition/${id}/start`, { + variables: this.formatInput(input), + }); + } + + private formatInput(input: AnyObject) { + input.customHeaders = { + url: process.env.LOGIN_URL, + method: 'GET', + }; + const inputObject: AnyObject = {}; + for (const key in input) { + inputObject[key] = { + value: + typeof input[key] === 'object' + ? JSON.stringify(input[key]) + : input[key], + }; + if (typeof input[key] === 'object') { + inputObject[key].type = 'object'; + inputObject[key].valueInfo = { + objectTypeName: 'java.util.LinkedHashMap', + serializationDataFormat: 'application/json', + }; + } + } + return inputObject; + } +} diff --git a/services/bpmn-service/src/connectors/camunda/implementation.provider.ts b/services/bpmn-service/src/connectors/camunda/implementation.provider.ts new file mode 100644 index 0000000000..4357efc179 --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/implementation.provider.ts @@ -0,0 +1,47 @@ +// Copyright (c) 2023 Sourcefuse Technologies +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT +import {inject, Provider} from '@loopback/context'; +import {AnyObject} from '@loopback/repository'; +import {ILogger, LOGGER} from '@sourceloop/core'; +import {TopicSubscription} from 'camunda-external-task-client-js'; +import {WorkerImplementationFn, WorkerNameCmdPair} from '../../types'; +import {CLIENT} from './keys'; +import {ClientWithSubscriptions} from './types'; + +export class CamundaImplementationProvider + implements Provider +{ + private subscriptions: TopicSubscription[] = []; + constructor( + @inject(LOGGER.LOGGER_INJECT) + private readonly ilogger: ILogger, + @inject(CLIENT) + private readonly camunda: ClientWithSubscriptions, + ) {} + value(): WorkerImplementationFn { + return async worker => { + if (this.camunda.client) { + worker.running = true; + this.camunda.subscriptions.push(this.subscribeToTopic(worker)); + } else { + throw new Error('Workflow client not connected'); + } + }; + } + + private subscribeToTopic(worker: WorkerNameCmdPair) { + worker.running = true; + return this.camunda.client.subscribe( + worker.topic, + ({task, taskService}: {task: AnyObject; taskService: AnyObject}) => { + worker.command.operation({task, taskService}, (result: AnyObject) => { + if (result) { + this.ilogger.info(`Worker task completed - ${worker.topic}`); + } + }); + }, + ); + } +} diff --git a/services/bpmn-service/src/connectors/camunda/index.ts b/services/bpmn-service/src/connectors/camunda/index.ts new file mode 100644 index 0000000000..96b3845ad1 --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/index.ts @@ -0,0 +1,7 @@ +export * from './camunda.component'; +export * from './connector.service'; +export * from './implementation.provider'; +export * from './keys'; +export * from './manager.provider'; +export * from './task.observer'; +export * from './types'; diff --git a/services/bpmn-service/src/connectors/camunda/keys.ts b/services/bpmn-service/src/connectors/camunda/keys.ts new file mode 100644 index 0000000000..e71b0e65e0 --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/keys.ts @@ -0,0 +1,7 @@ +import {BindingKey} from '@loopback/context'; +import {BINDING_PREFIX} from '@sourceloop/core'; +import {ClientWithSubscriptions} from './types'; + +export const CLIENT = BindingKey.create( + `${BINDING_PREFIX}.workflow.client`, +); diff --git a/services/bpmn-service/src/connectors/camunda/manager.provider.ts b/services/bpmn-service/src/connectors/camunda/manager.provider.ts new file mode 100644 index 0000000000..000ba357be --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/manager.provider.ts @@ -0,0 +1,113 @@ +import {bind, BindingScope, Provider, service} from '@loopback/core'; +import {AnyObject} from '@loopback/repository'; +import {HttpErrors} from '@loopback/rest'; +import {Workflow, WorkflowDto, WorkflowVersion} from '../../models'; +import {WorflowManager} from '../../types'; +import {CamundaConnectorService} from './connector.service'; + +@bind({scope: BindingScope.TRANSIENT}) +export class CamundaManagerProvider implements Provider { + constructor( + @service(CamundaConnectorService) + private readonly camunda: CamundaConnectorService, + ) {} + value(): WorflowManager { + return { + getWorkflowById: async workflow => { + const response = await this.camunda.get( + workflow.externalIdentifier, + ); + + return new Workflow({ + provider: 'camunda', + id: workflow.id, + workflowVersion: workflow.workflowVersion, + externalIdentifier: response.id, + name: response.name, + inputSchema: workflow.inputSchema, + }); + }, + startWorkflow: async (input, workflow, version) => { + let response; + if (version) { + response = await this.camunda.execute( + version.externalWorkflowId, + input, + ); + } else { + response = await this.camunda.execute( + workflow.externalIdentifier, + input, + ); + } + return response; + }, + createWorkflow: async (workflowDto: WorkflowDto) => { + const response = await this.camunda.create( + workflowDto.name, + Buffer.from(workflowDto.bpmnFile, 'utf-8'), + ); + let version = 1; + let id; + if (response.deployedProcessDefinitions) { + const definitions = response.deployedProcessDefinitions; + const processDefinition = Object.values( + //NOSONAR + definitions, //NOSONAR + )[0] as AnyObject; //NOSONAR + version = processDefinition.version; + id = processDefinition.id; + } else { + throw new HttpErrors.BadRequest( + 'Workflow with same name and definition already exists', + ); + } + return { + version: version, + provider: 'camunda', + processId: id, + externalId: id, + fileRef: workflowDto.bpmnFile, + }; + }, + updateWorkflow: async (workflowDto: WorkflowDto) => { + const response = await this.camunda.create( + workflowDto.name, + Buffer.from(workflowDto.bpmnFile, 'utf-8'), + ); + let version; + let id; + if (response.deployedProcessDefinitions) { + const processDefinition = Object.values( + //NOSONAR + response.deployedProcessDefinitions, //NOSONAR + )[0] as AnyObject; //NOSONAR + version = processDefinition.version; + id = processDefinition.id; + } else { + throw new Error('Workflow not changed or Worflow does not exist!'); + } + return { + version: version, + provider: 'camunda', + processId: id, + externalId: id, + fileRef: workflowDto.bpmnFile, + }; + }, + deleteWorkflowById: async (workflow: Workflow) => { + await this.camunda.delete( + workflow.workflowVersions.map( + (version: WorkflowVersion) => version.externalWorkflowId, + ), + ); + return workflow; + }, + + deleteWorkflowVersionById: async (version: WorkflowVersion) => { + await this.camunda.deleteVersion(version.externalWorkflowId); + return version; + }, + }; + } +} diff --git a/services/bpmn-service/src/connectors/camunda/task.observer.ts b/services/bpmn-service/src/connectors/camunda/task.observer.ts new file mode 100644 index 0000000000..3ea812453f --- /dev/null +++ b/services/bpmn-service/src/connectors/camunda/task.observer.ts @@ -0,0 +1,119 @@ +import { + Constructor, + Context, + inject, + lifeCycleObserver, + LifeCycleObserver, + Setter, +} from '@loopback/core'; +import {ILogger, LOGGER} from '@sourceloop/core'; +import { + logger as camundaLogger, + Client, + TopicSubscription, +} from 'camunda-external-task-client-js'; +import {CAMUNDA_RETRY_INTERVAL, MAX_CAMUNDA_ATTEMPTS} from '../../constants'; +import {WorkflowServiceBindings} from '../../keys'; +import { + BPMTask, + ICommandWithTopic, + IWorkflowServiceConfig, + WorkerImplementationFn, +} from '../../types'; +import {CLIENT} from './keys'; +import {ClientWithSubscriptions} from './types'; + +@lifeCycleObserver() +export class TaskObserver implements LifeCycleObserver { + client: Client; + private readonly subscriptions: TopicSubscription[] = []; + private attempts = 0; + constructor( + @inject(LOGGER.LOGGER_INJECT) + private readonly logger: ILogger, + @inject(WorkflowServiceBindings.COMMANDS) + private commandsCtors: Constructor[], + @inject.context() + private readonly ctx: Context, + @inject(WorkflowServiceBindings.Config) + private readonly config: IWorkflowServiceConfig, + @inject.setter(CLIENT) + private readonly setClient: Setter, + ) {} + + async start(): Promise { + if (this.config.workflowEngineBaseUrl) { + this.client = new Client({ + baseUrl: this.config.workflowEngineBaseUrl, + use: camundaLogger, + workerId: process.env.CAMUNDA_WORKER_ID ?? 'default', + lockDuration: parseInt(process.env.LOCK_DURATION ?? '500000'), + }); + this.setClient({ + client: this.client, + subscriptions: this.subscriptions, + }); + } else { + throw new Error('Invalid workflowEngine Config'); + } + const workerFn = await this.ctx.get( + WorkflowServiceBindings.WorkerImplementationFunction, + ); + this._handleEvents(this.client, workerFn); + await this._subscribe(workerFn); + this.logger.debug('Commands registered'); + } + + async stop(): Promise { + await this._unsubscribe(); + this.logger.debug('Stopping client'); + this.client.stop(); + this.logger.debug('Client stopped'); + } + + private async _subscribe(workerFn: WorkerImplementationFn) { + for (const cmdCtor of this.commandsCtors) { + const command = new cmdCtor(this.ctx); + this.logger.debug(`Registering command ${command.topic}`); + await workerFn({ + topic: command.topic, + command: new BPMTask(command), + running: false, + }); + } + } + + private async _unsubscribe() { + this.logger.debug('Unsubscribing from topics'); + for (const subscription of this.subscriptions) { + subscription.unsubscribe(); + } + this.logger.debug('Unsubscribed from topics'); + } + + private _handleEvents(client: Client, workerFn: WorkerImplementationFn) { + const maxAttempts = +( + process.env.MAX_CAMUNDA_ATTEMPTS ?? MAX_CAMUNDA_ATTEMPTS + ); + // Default interval is 1 minute + const defaultInterval = +( + process.env.CAMUNDA_RETRY_INTERVAL ?? CAMUNDA_RETRY_INTERVAL + ); + client.on('poll:error', async (error: Error) => { + await this._unsubscribe(); + this.attempts++; + this.logger.error(`Attempts made - ${this.attempts}`); + this.logger.error(`Retry after - ${defaultInterval / 1000} seconds`); + if (this.attempts <= maxAttempts) { + setTimeout(async () => { + await this._subscribe(workerFn); + }, defaultInterval); + } else { + this.logger.error('Max attempts reached. Stopping client'); + if (!process.env.WORKER_FAILURE_CRASH_DISABLE) { + throw new Error('Workflow client not connected'); + } + } + }); + } +} diff --git a/services/bpmn-service/src/types/camunda/case-definition.ts b/services/bpmn-service/src/connectors/camunda/types/case-definition.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/case-definition.ts rename to services/bpmn-service/src/connectors/camunda/types/case-definition.ts diff --git a/services/bpmn-service/src/types/camunda/decision-definition.ts b/services/bpmn-service/src/connectors/camunda/types/decision-definition.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/decision-definition.ts rename to services/bpmn-service/src/connectors/camunda/types/decision-definition.ts diff --git a/services/bpmn-service/src/types/camunda/decision-requirements-definition.ts b/services/bpmn-service/src/connectors/camunda/types/decision-requirements-definition.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/decision-requirements-definition.ts rename to services/bpmn-service/src/connectors/camunda/types/decision-requirements-definition.ts diff --git a/services/bpmn-service/src/types/camunda/deployement-with-definitions.ts b/services/bpmn-service/src/connectors/camunda/types/deployement-with-definitions.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/deployement-with-definitions.ts rename to services/bpmn-service/src/connectors/camunda/types/deployement-with-definitions.ts diff --git a/services/bpmn-service/src/types/camunda/index.ts b/services/bpmn-service/src/connectors/camunda/types/index.ts similarity index 51% rename from services/bpmn-service/src/types/camunda/index.ts rename to services/bpmn-service/src/connectors/camunda/types/index.ts index 976434446e..9b161ffd3f 100644 --- a/services/bpmn-service/src/types/camunda/index.ts +++ b/services/bpmn-service/src/connectors/camunda/types/index.ts @@ -2,11 +2,28 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT -export * from './process'; -export * from './variable'; -export * from './types'; -export * from './process-definition'; +import {Client, TopicSubscription} from 'camunda-external-task-client-js'; + export * from './case-definition'; export * from './decision-definition'; export * from './decision-requirements-definition'; export * from './deployement-with-definitions'; +export * from './process'; +export * from './process-definition'; +export * from './types'; +export * from './variable'; + +export interface CamundaTask { + id: string; + name: string; + created: string; + description: string; + processDefinitionId: string; + processInstanceId: string; + // it also has other properties but they are not relevant +} + +export type ClientWithSubscriptions = { + client: Client; + subscriptions: TopicSubscription[]; +}; diff --git a/services/bpmn-service/src/types/camunda/process-definition.ts b/services/bpmn-service/src/connectors/camunda/types/process-definition.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/process-definition.ts rename to services/bpmn-service/src/connectors/camunda/types/process-definition.ts diff --git a/services/bpmn-service/src/types/camunda/process.ts b/services/bpmn-service/src/connectors/camunda/types/process.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/process.ts rename to services/bpmn-service/src/connectors/camunda/types/process.ts diff --git a/services/bpmn-service/src/types/camunda/types.ts b/services/bpmn-service/src/connectors/camunda/types/types.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/types.ts rename to services/bpmn-service/src/connectors/camunda/types/types.ts diff --git a/services/bpmn-service/src/types/camunda/variable.ts b/services/bpmn-service/src/connectors/camunda/types/variable.ts similarity index 100% rename from services/bpmn-service/src/types/camunda/variable.ts rename to services/bpmn-service/src/connectors/camunda/types/variable.ts diff --git a/services/bpmn-service/src/constants.ts b/services/bpmn-service/src/constants.ts new file mode 100644 index 0000000000..667582f3dc --- /dev/null +++ b/services/bpmn-service/src/constants.ts @@ -0,0 +1,2 @@ +export const MAX_CAMUNDA_ATTEMPTS = 3; +export const CAMUNDA_RETRY_INTERVAL = 60000; diff --git a/services/bpmn-service/src/controllers/workflow.controller.ts b/services/bpmn-service/src/controllers/workflow.controller.ts index d771ef525a..c760c47860 100644 --- a/services/bpmn-service/src/controllers/workflow.controller.ts +++ b/services/bpmn-service/src/controllers/workflow.controller.ts @@ -2,10 +2,9 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT -import {Getter, inject} from '@loopback/core'; -import {AnyObject, Filter, repository} from '@loopback/repository'; +import {inject} from '@loopback/core'; +import {AnyObject, Filter} from '@loopback/repository'; import { - HttpErrors, del, get, getFilterSchemaFor, @@ -22,36 +21,18 @@ import { } from '@sourceloop/core'; import {STRATEGY, authenticate} from 'loopback4-authentication'; import {authorize} from 'loopback4-authorization'; -import {ErrorKeys} from '../enums/error-keys.enum'; import {PermissionKey} from '../enums/permission-key.enum'; import {WorkflowServiceBindings} from '../keys'; -import {Workflow, WorkflowVersion} from '../models'; +import {Workflow} from '../models'; import {ExecuteWorkflowDto} from '../models/execute-workflow-dto'; import {WorkflowDto} from '../models/workflow-dto.model'; -import {WorkflowRepository} from '../repositories'; -import {WorkflowVersionRepository} from '../repositories/workflow-version.repository'; -import { - ExecutionInputValidator, - WorflowManager, - WorkerImplementationFn, - WorkerMap, -} from '../types'; +import {IWorkflowService} from '../types'; const basePath = '/workflows'; export class WorkflowController { constructor( - @repository(WorkflowRepository) - public workflowRepository: WorkflowRepository, - @repository(WorkflowVersionRepository) - public workflowVersionRepository: WorkflowVersionRepository, - @inject(WorkflowServiceBindings.WorkflowManager) - private readonly workflowManagerService: WorflowManager, - @inject(WorkflowServiceBindings.ExecutionInputValidatorFn) - private readonly execInputValidator: ExecutionInputValidator, - @inject.getter(WorkflowServiceBindings.WORKER_MAP) - private readonly workerMapGetter: Getter, - @inject(WorkflowServiceBindings.WorkerImplementationFunction) - private readonly workerFn: WorkerImplementationFn, + @inject(WorkflowServiceBindings.WorkflowService) + private readonly workflowService: IWorkflowService, ) {} @authenticate(STRATEGY.BEARER) @@ -84,34 +65,7 @@ export class WorkflowController { }) workflowDto: WorkflowDto, ): Promise { - try { - const workflowResponse = - await this.workflowManagerService.createWorkflow(workflowDto); - - const entity = new Workflow({ - workflowVersion: workflowResponse.version, - externalIdentifier: workflowResponse.processId, - name: workflowDto.name, - provider: workflowResponse.provider, - inputSchema: workflowDto.inputSchema, - description: workflowDto.description, - }); - - const newWorkflow = await this.workflowRepository.create(entity); - - const version = new WorkflowVersion({ - workflowId: newWorkflow.id, - version: workflowResponse.version, - bpmnDiagram: workflowResponse.fileRef, - externalWorkflowId: workflowResponse.externalId, - inputSchema: workflowDto.inputSchema, - }); - - await this.workflowVersionRepository.create(version); - return newWorkflow; - } catch (e) { - throw new HttpErrors.BadRequest(e); - } + return this.workflowService.create(workflowDto); } @authenticate(STRATEGY.BEARER) @@ -140,29 +94,7 @@ export class WorkflowController { workflowDto: WorkflowDto, @param.path.string('id') id: string, ): Promise { - const workflowResponse = - await this.workflowManagerService.updateWorkflow(workflowDto); - - const entity = new Workflow({ - workflowVersion: workflowResponse.version, - externalIdentifier: workflowResponse.processId, - name: workflowDto.name, - provider: workflowResponse.provider, - inputSchema: workflowDto.inputSchema, - description: workflowDto.description, - }); - - await this.workflowRepository.updateById(id, entity); - - const version = new WorkflowVersion({ - workflowId: id, - version: workflowResponse.version, - bpmnDiagram: workflowResponse.fileRef, - externalWorkflowId: workflowResponse.externalId, - inputSchema: workflowDto.inputSchema, - }); - - await this.workflowVersionRepository.create(version); + return this.workflowService.updateById(id, workflowDto); } @authenticate(STRATEGY.BEARER) @@ -191,37 +123,7 @@ export class WorkflowController { }) instance: ExecuteWorkflowDto, ): Promise { - const workflow = await this.workflowRepository.findOne({ - where: { - id: id, - }, - }); - let version; - if (!workflow) { - throw new HttpErrors.NotFound(ErrorKeys.WorkflowNotFound); - } - let inputSchema = workflow.inputSchema; - if (instance.workflowVersion) { - version = await this.workflowVersionRepository.findOne({ - where: { - version: instance.workflowVersion, - workflowId: workflow.id, - }, - }); - if (version) { - inputSchema = version.inputSchema; - } else { - throw new HttpErrors.NotFound(ErrorKeys.VersionNotFound); - } - } - - await this.execInputValidator(inputSchema, instance.input); - await this.initWorkers(workflow.name); - return this.workflowManagerService.startWorkflow( - instance.input, - workflow, - version, - ); + return this.workflowService.executeById(id, instance); } @authenticate(STRATEGY.BEARER) @@ -245,9 +147,7 @@ export class WorkflowController { @param.query.object('filter', getFilterSchemaFor(Workflow)) filter?: Filter, ): Promise { - return this.workflowRepository.find(filter, { - include: ['workflowVersions'], - }); + return this.workflowService.find(filter); } @authenticate(STRATEGY.BEARER) @@ -263,8 +163,7 @@ export class WorkflowController { }, }) async count(@param.path.string('id') id: string): Promise { - const workflow = await this.workflowRepository.findById(id); - return this.workflowManagerService.getWorkflowById(workflow); + return this.workflowService.findById(id); } @authenticate(STRATEGY.BEARER) @@ -283,14 +182,7 @@ export class WorkflowController { }, }) async deleteById(@param.path.string('id') id: string): Promise { - const workflow = await this.workflowRepository.findById(id, { - include: ['workflowVersions'], - }); - await this.workflowManagerService.deleteWorkflowById(workflow); - await this.workflowVersionRepository.deleteAll({ - workflowId: workflow.id, - }); - await this.workflowRepository.deleteById(workflow.id); + return this.workflowService.deleteById(id); } @authenticate(STRATEGY.BEARER) @@ -312,42 +204,6 @@ export class WorkflowController { @param.path.string('id') id: string, @param.path.number('version') versionNumber: number, ): Promise { - const workflow = await this.workflowRepository.findById(id); - if (workflow.workflowVersion === versionNumber) { - throw new HttpErrors.BadRequest( - 'Can not delete latest version of a workflow', - ); - } - const version = await this.workflowVersionRepository.findOne({ - where: { - workflowId: id, - version: versionNumber, - }, - }); - if (!this.workflowManagerService.deleteWorkflowVersionById) { - throw new HttpErrors.InternalServerError( - 'Version Delete Provider Missing', - ); - } - - if (version) { - await this.workflowManagerService.deleteWorkflowVersionById(version); - await this.workflowVersionRepository.deleteById(version.id); - } else { - throw new HttpErrors.NotFound(ErrorKeys.VersionNotFound); - } - } - - private async initWorkers(workflowName: string) { - const workerMap = await this.workerMapGetter(); - if (workerMap?.[workflowName]) { - const workerList = workerMap[workflowName]; - for (const worker of workerList) { - if (!worker.running) { - await this.workerFn(worker); - worker.running = true; - } - } - } + return this.workflowService.deleteVersionById(id, versionNumber); } } diff --git a/services/bpmn-service/src/keys.ts b/services/bpmn-service/src/keys.ts index 6f5b1867b0..ec8910ae44 100644 --- a/services/bpmn-service/src/keys.ts +++ b/services/bpmn-service/src/keys.ts @@ -2,10 +2,12 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT -import {BindingKey} from '@loopback/core'; +import {BindingKey, Constructor} from '@loopback/core'; import {BINDING_PREFIX} from '@sourceloop/core'; import { ExecutionInputValidator, + ICommand, + IWorkflowService, IWorkflowServiceConfig, WorflowManager, WorkerImplementationFn, @@ -35,4 +37,11 @@ export namespace WorkflowServiceBindings { export const WORKER_MAP = BindingKey.create( `${BINDING_PREFIX}.workflow.worker.map`, ); + export const COMMANDS = BindingKey.create[]>( + `${BINDING_PREFIX}.workflow.commands`, + ); + + export const WorkflowService = BindingKey.create( + `${BINDING_PREFIX}.workflow.service`, + ); } diff --git a/services/bpmn-service/src/providers/worker-implementation.provider.ts b/services/bpmn-service/src/providers/worker-implementation.provider.ts deleted file mode 100644 index 7de809580a..0000000000 --- a/services/bpmn-service/src/providers/worker-implementation.provider.ts +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright (c) 2023 Sourcefuse Technologies -// -// This software is released under the MIT License. -// https://opensource.org/licenses/MIT -import {inject, Provider} from '@loopback/context'; -import {IWorkflowServiceConfig, WorkerImplementationFn} from '../types'; -import {Client, logger} from 'camunda-external-task-client-js'; -import {WorkflowServiceBindings} from '../keys'; -import {AnyObject} from '@loopback/repository'; -import {ILogger, LOGGER} from '@sourceloop/core'; - -export class WorkerImplementationProvider - implements Provider -{ - client: Client; - constructor( - @inject(WorkflowServiceBindings.Config) - config: IWorkflowServiceConfig, - @inject(LOGGER.LOGGER_INJECT) - private readonly ilogger: ILogger, - ) { - if (config.workflowEngineBaseUrl) { - this.client = new Client({ - baseUrl: config.workflowEngineBaseUrl, - use: logger, - }); - } else { - throw new Error('Invalid workflowEngine Config'); - } - } - value(): WorkerImplementationFn { - return async worker => { - if (this.client) { - worker.running = true; - const subscription = this.client.subscribe( - worker.topic, - ({task, taskService}) => { - worker.command.operation( - {task, taskService}, - (result: AnyObject) => { - if (result) { - this.ilogger.info(`Worker task completed - ${worker.topic}`); - } - }, - ); - }, - ); - this.client.on('poll:error', () => { - worker.running = false; - subscription.unsubscribe(); - }); - } else { - throw new Error('Workflow client not connected'); - } - }; - } -} diff --git a/services/bpmn-service/src/services/http.service.ts b/services/bpmn-service/src/services/http.service.ts new file mode 100644 index 0000000000..6480ead965 --- /dev/null +++ b/services/bpmn-service/src/services/http.service.ts @@ -0,0 +1,96 @@ +// Copyright (c) 2023 Sourcefuse Technologies +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT +import {BindingScope, injectable} from '@loopback/core'; +import {AnyObject} from '@loopback/repository'; +import {HttpErrors} from '@loopback/rest'; +import {STATUS_CODE} from '@sourceloop/core'; +import FormData from 'form-data'; +import fetch, {Response} from 'node-fetch'; +import {HttpOptions} from '../types'; + +@injectable({scope: BindingScope.SINGLETON}) +export class HttpClientService { + get(url: string, options?: HttpOptions) { + const processed = this.processOptions(url, options); + return fetch(processed.url, { + headers: processed.headers, + }).then(res => this.handleRes(res)); + } + + delete(url: string, options?: HttpOptions) { + const processed = this.processOptions(url, options); + return fetch(processed.url, { + method: 'delete', + headers: processed.headers, + }).then(res => this.handleRes(res)); + } + + post(url: string, body: AnyObject, options?: HttpOptions) { + const processed = this.processOptions(url, options); + const contentTypeHeader = 'content-type'; + return fetch(processed.url, { + method: 'post', + body: JSON.stringify(body), + headers: { + [contentTypeHeader]: 'application/json', + ...processed.headers, + }, + }).then(res => this.handleRes(res)); + } + + postFormData(url: string, body: FormData, options?: HttpOptions) { + const processed = this.processOptions(url, options); + return fetch(processed.url, { + method: 'post', + body: body, + headers: { + ...body.getHeaders(), + ...processed.headers, + }, + }).then(res => this.handleRes(res)); + } + + private processOptions(url: string, options?: HttpOptions) { + let headers = {}; + if (options?.query) { + url = `${url}?${this.serialize(options.query)}`; + } + if (options?.urlParams) { + for (const key in options.urlParams) { + url = url.replace( + new RegExp(`\\{${key}\\}`, 'gi'), + options.urlParams[key], + ); + } + } + if (options?.headers) { + headers = options.headers; + } + + return {url, headers}; + } + + private async handleRes(res: Response): Promise { + if (res.status === STATUS_CODE.OK) { + return (res.json() as Promise).catch( + e => Promise.resolve({}) as Promise, + ); + } else if (res.status === STATUS_CODE.NO_CONTENT) { + return Promise.resolve({}) as Promise; + } else { + throw new HttpErrors.BadRequest(await res.text()); + } + } + + private serialize(obj: AnyObject) { + const str = []; + for (const p in obj) { + if (obj.hasOwnProperty(p)) { + str.push(`${encodeURIComponent(p)}=${encodeURIComponent(obj[p])}`); + } + } + return str.join('&'); + } +} diff --git a/services/bpmn-service/src/services/index.ts b/services/bpmn-service/src/services/index.ts new file mode 100644 index 0000000000..0b3529b87d --- /dev/null +++ b/services/bpmn-service/src/services/index.ts @@ -0,0 +1,2 @@ +export * from './http.service'; +export * from './workflow.service'; diff --git a/services/bpmn-service/src/services/workflow.service.ts b/services/bpmn-service/src/services/workflow.service.ts new file mode 100644 index 0000000000..510d979413 --- /dev/null +++ b/services/bpmn-service/src/services/workflow.service.ts @@ -0,0 +1,202 @@ +import {Getter, inject} from '@loopback/core'; +import { + AnyObject, + Filter, + FilterExcludingWhere, + repository, +} from '@loopback/repository'; +import {HttpErrors} from '@loopback/rest'; +import {ErrorKeys} from '../enums'; +import {WorkflowServiceBindings} from '../keys'; +import { + ExecuteWorkflowDto, + Workflow, + WorkflowDto, + WorkflowVersion, +} from '../models'; +import {WorkflowRepository, WorkflowVersionRepository} from '../repositories'; +import { + ExecutionInputValidator, + IWorkflowService, + WorflowManager, + WorkerImplementationFn, + WorkerMap, +} from '../types'; + +export class WorkflowService implements IWorkflowService { + constructor( + @repository(WorkflowRepository) + private readonly workflowRepository: WorkflowRepository, + @repository(WorkflowVersionRepository) + private readonly workflowVersionRepository: WorkflowVersionRepository, + @inject(WorkflowServiceBindings.WorkflowManager) + private readonly workflowManagerService: WorflowManager, + @inject.getter(WorkflowServiceBindings.WORKER_MAP) + private readonly workerMapGetter: Getter, + @inject(WorkflowServiceBindings.WorkerImplementationFunction) + private readonly workerFn: WorkerImplementationFn, + @inject(WorkflowServiceBindings.ExecutionInputValidatorFn) + private readonly execInputValidator: ExecutionInputValidator, + ) {} + async create(workflowDto: WorkflowDto): Promise { + const workflowResponse = + await this.workflowManagerService.createWorkflow(workflowDto); + + const entity = new Workflow({ + workflowVersion: workflowResponse.version, + externalIdentifier: workflowResponse.processId, + name: workflowDto.name, + provider: workflowResponse.provider, + inputSchema: workflowDto.inputSchema, + description: workflowDto.description, + }); + + const newWorkflow = await this.workflowRepository.create(entity); + + const version = new WorkflowVersion({ + workflowId: newWorkflow.id, + version: workflowResponse.version, + bpmnDiagram: workflowResponse.fileRef, + externalWorkflowId: workflowResponse.externalId, + inputSchema: workflowDto.inputSchema, + }); + + await this.workflowVersionRepository.create(version); + return newWorkflow; + } + + async updateById(id: string, workflow: Partial): Promise { + const exists = await this.workflowRepository.exists(id); + + if (!exists) { + throw new HttpErrors.NotFound(ErrorKeys.WorkflowNotFound); + } + + const workflowResponse = + await this.workflowManagerService.updateWorkflow(workflow); + + const entity = new Workflow({ + workflowVersion: workflowResponse.version, + externalIdentifier: workflowResponse.processId, + name: workflow.name, + provider: workflowResponse.provider, + inputSchema: workflow.inputSchema, + description: workflow.description, + }); + + await this.workflowRepository.updateById(id, entity); + + const version = new WorkflowVersion({ + workflowId: id, + version: workflowResponse.version, + bpmnDiagram: workflowResponse.fileRef, + externalWorkflowId: workflowResponse.externalId, + inputSchema: workflow.inputSchema, + }); + + await this.workflowVersionRepository.create(version); + } + + async deleteById(id: string): Promise { + const workflow = await this.workflowRepository.findById(id, { + include: ['workflowVersions'], + }); + await this.workflowVersionRepository.deleteAll({ + workflowId: workflow.id, + }); + await this.workflowRepository.deleteById(workflow.id); + } + + async deleteVersionById(id: string, versionNumber: number): Promise { + const workflow = await this.workflowRepository.findById(id); + if (workflow.workflowVersion === versionNumber) { + throw new HttpErrors.BadRequest( + 'Can not delete latest version of a workflow', + ); + } + const version = await this.workflowVersionRepository.findOne({ + where: { + workflowId: id, + version: versionNumber, + }, + }); + if (!this.workflowManagerService.deleteWorkflowVersionById) { + throw new HttpErrors.InternalServerError( + 'Version Delete Provider Missing', + ); + } + + if (version) { + await this.workflowManagerService.deleteWorkflowVersionById(version); + await this.workflowVersionRepository.deleteById(version.id); + } else { + throw new HttpErrors.NotFound(ErrorKeys.VersionNotFound); + } + } + + find(filter?: Filter | undefined): Promise { + return this.workflowRepository.find(filter); + } + + findById( + id: string, + filter?: FilterExcludingWhere | undefined, + ): Promise { + return this.workflowRepository.findById(id, filter); + } + + async count(id: string): Promise { + const workflow = await this.workflowRepository.findById(id); + return this.workflowManagerService.getWorkflowById(workflow); + } + + async executeById( + id: string, + instance: ExecuteWorkflowDto, + ): Promise { + const workflow = await this.workflowRepository.findOne({ + where: { + id: id, + }, + }); + let version; + if (!workflow) { + throw new HttpErrors.NotFound(ErrorKeys.WorkflowNotFound); + } + let inputSchema = workflow.inputSchema; + if (instance.workflowVersion) { + version = await this.workflowVersionRepository.findOne({ + where: { + version: instance.workflowVersion, + workflowId: workflow.id, + }, + }); + if (version) { + inputSchema = version.inputSchema; + } else { + throw new HttpErrors.NotFound(ErrorKeys.VersionNotFound); + } + } + + await this.execInputValidator(inputSchema, instance.input); + await this._initWorkers(workflow.name); + return this.workflowManagerService.startWorkflow( + instance.input, + workflow, + version, + ); + } + + private async _initWorkers(workflowName: string) { + const workerMap = await this.workerMapGetter(); + if (workerMap?.[workflowName]) { + const workerList = workerMap[workflowName]; + for (const worker of workerList) { + if (!worker.running) { + await this.workerFn(worker); + worker.running = true; + } + } + } + } +} diff --git a/services/bpmn-service/src/types/index.ts b/services/bpmn-service/src/types/index.ts index 91049b8a8f..26c8fbc56f 100644 --- a/services/bpmn-service/src/types/index.ts +++ b/services/bpmn-service/src/types/index.ts @@ -2,6 +2,6 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT -export * from './camunda'; +export * from '../connectors/camunda/types'; export * from './types'; export * from './bpm-task'; diff --git a/services/bpmn-service/src/types/types.ts b/services/bpmn-service/src/types/types.ts index 402e774140..016306d28a 100644 --- a/services/bpmn-service/src/types/types.ts +++ b/services/bpmn-service/src/types/types.ts @@ -2,10 +2,15 @@ // // This software is released under the MIT License. // https://opensource.org/licenses/MIT -import {AnyObject} from '@loopback/repository'; +import {AnyObject, Filter, FilterExcludingWhere} from '@loopback/repository'; import {IServiceConfig} from '@sourceloop/core'; +import { + ExecuteWorkflowDto, + Workflow, + WorkflowDto, + WorkflowVersion, +} from '../models'; import {BPMTask} from './bpm-task'; -import {Workflow, WorkflowVersion, WorkflowDto} from '../models'; /* eslint-disable @typescript-eslint/no-explicit-any */ export interface ICommand { @@ -13,6 +18,24 @@ export interface ICommand { execute(): Promise; //NOSONAR } +export interface IWorkflowService { + create(workflow: WorkflowDto): Promise; + updateById(id: string, workflow: Partial): Promise; + deleteById(id: string): Promise; + deleteVersionById(id: string, version: number): Promise; + findById( + id: string, + filter?: FilterExcludingWhere, + ): Promise; + find(filter?: Filter): Promise; + count(id: string): Promise; + executeById(id: string, instance: ExecuteWorkflowDto): Promise; +} + +export interface ICommandWithTopic extends ICommand { + topic: string; +} + export interface IBPMTask { operation(data?: T, done?: (data: R) => void): void; command: ICommand; @@ -31,12 +54,17 @@ export interface WorflowManager { version?: WorkflowVersion, ): Promise; createWorkflow(workflowDto: WorkflowDto): Promise; - updateWorkflow(workflowDto: WorkflowDto): Promise; + updateWorkflow(workflowDto: Partial): Promise; deleteWorkflowById(workflow: Workflow): Promise; deleteWorkflowVersionById?( version: WorkflowVersion, ): Promise; } +export type HttpOptions = { + query?: AnyObject; + urlParams?: AnyObject; + headers?: AnyObject; +}; export type ExecutionInputValidator = ( schema: AnyObject,