From c28410ca8345cf085d59192e2fb9e18e84813775 Mon Sep 17 00:00:00 2001 From: Josh Wulf Date: Mon, 6 May 2024 15:27:05 +1200 Subject: [PATCH] feat(repo): add CAMUNDA_CUSTOM_ROOT_CERT_STRING parameter accept a custom certificate as a string argument to the configuration fixes #142 --- .gitignore | 3 +- .../GetCustomCertificateBuffer.unit.spec.ts | 124 ++++++++- src/lib/Configuration.ts | 5 + src/lib/GetCustomCertificateBuffer.ts | 5 +- src/zeebe/zb/ZeebeGrpcClient.ts | 246 +++++++++--------- 5 files changed, 248 insertions(+), 135 deletions(-) diff --git a/.gitignore b/.gitignore index ba4a9775..e60be868 100644 --- a/.gitignore +++ b/.gitignore @@ -134,4 +134,5 @@ dist *.env !docker/.env -docs \ No newline at end of file +docs +.nx/cache \ No newline at end of file diff --git a/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts b/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts index 98cd3b07..07507d21 100644 --- a/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts +++ b/src/__tests__/lib/GetCustomCertificateBuffer.unit.spec.ts @@ -14,6 +14,18 @@ import { import { OperateApiClient } from '../../operate' import { ZeebeGrpcClient } from '../../zeebe' +let server +afterEach(() => { + ;(server && server.close && server.close()) || + (server && + server.tryShutdown && + server.tryShutdown((err) => { + if (err) { + throw err + } + })) +}) + test('Can use a custom root certificate to connect to a REST API', async () => { const app = express() @@ -26,7 +38,7 @@ test('Can use a custom root certificate to connect to a REST API', async () => { cert: fs.readFileSync(path.join(__dirname, 'localhost.crt')), } - const server = https.createServer(options, app) + server = https.createServer(options, app) server.listen(3012, () => { // console.log('Server listening on port 3012') @@ -81,7 +93,7 @@ test('gRPC server with self-signed certificate', (done) => { ) as unknown as { gateway_protocol: { Gateway: any } } // Create the server - const server = new Server() + server = new Server() // Add a service to the server server.addService(zeebeProto.gateway_protocol.Gateway.service, { @@ -140,16 +152,104 @@ test('gRPC server with self-signed certificate', (done) => { done() }) }) - // const zbc2 = new ZeebeGrpcClient({ - // config: { - // CAMUNDA_OAUTH_DISABLED: true, - // ZEEBE_ADDRESS: 'localhost:50051', - // }, - // }) - // zbc2.topology().catch((e) => { - // console.log(e) - // zbc2.close() - // }) + } + ) +}) + +test('gRPC server with self-signed certificate provided via string', (done) => { + // Load the protobuf definition + const packageDefinition = loadSync( + path.join(__dirname, '..', '..', 'proto', 'zeebe.proto'), + { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + } + ) + + const zeebeProto = loadPackageDefinition( + packageDefinition + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ) as unknown as { gateway_protocol: { Gateway: any } } + + // Create the server + const server = new Server() + + // Add a service to the server + server.addService(zeebeProto.gateway_protocol.Gateway.service, { + Topology: (_, callback) => { + const t = new TopologyResponse() + const b = new BrokerInfo() + b.setHost('localhost') + const partition = new Partition() + partition.setHealth(0) + partition.setPartitionid(0) + partition.setRole(0) + b.setPartitionsList([partition]) + t.setBrokersList([b]) + callback(null, t) + }, + // Implement your service methods here + }) + + // Read the key and certificate + const key = fs.readFileSync(path.join(__dirname, 'localhost.key')) + const cert = fs.readFileSync(path.join(__dirname, 'localhost.crt')) + + // Start the server + server.bindAsync( + 'localhost:50051', + ServerCredentials.createSsl(null, [ + { + private_key: key, + cert_chain: cert, + }, + ]), + (err) => { + if (err) { + console.error(err) + done() + return + } + const zbc = new ZeebeGrpcClient({ + config: { + CAMUNDA_OAUTH_DISABLED: true, + ZEEBE_ADDRESS: 'localhost:50051', + CAMUNDA_CUSTOM_ROOT_CERT_STRING: `-----BEGIN CERTIFICATE----- +MIIC7TCCAdWgAwIBAgIUXlnuRfR2yE/v5t9A0ZoeVG4BvZowDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MB4XDTI0MDUwMTAzMTIyNloXDTI1MDUw +MTAzMTIyNlowFDESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEA57iCfaiUmW74QUb0MtX5MOCOfhKJ8zw0i4Ep+nwST03w +Z/K5Z0W7OPmJDnPsko9DPC3bOeObriTcWgYg4zNaLNSKvDbkbLtDvpUyY8rdJAP3 +6H7uTZWMoUQzdIGdSFZHa8HRO1HUZGFZT55bi7czyMPFnzSOtcaz4pCvlhLRk+QA +lsSG+4owhfDrpyPlFtEFNyeaE2fIsJtHxupkwGOmDNeh6iKV46lD8F0SGf2pl5qF +nbbMn6IZlpH7heQqMdPNs1ikGOuDybJISu07S72RSoClgdFzepzXFHoNWwhucdvN +UMJXWBnP/PoeNViI2+nBMrK/1Bwuhci0t5mjTujQNQIDAQABozcwNTAUBgNVHREE +DTALgglsb2NhbGhvc3QwHQYDVR0OBBYEFBuCDjLQbWjX7D+o9dt4nszcP3OIMA0G +CSqGSIb3DQEBCwUAA4IBAQASgexBeY7Nz9i0qREk1RzpQDIT+jM/QAgmnH3G6Ehx +tAYUMYLeDTmGhYhp3GJAU7/R3mbN6t5qg2d9Fa8b+JpBJRdxMY+CyjESoPvUHIE3 +lkgNGphT+8QPnh7uO5KOUVnk7Jc9MTwBntDouLHfuzJJnHPlRko3IWnwaivZYVRn +VbnUoSMKKPzFaqqaY8uHPjvs4Gt4OGcYV8hHcjeI3fMHckmsXZclxb3pwF+x698o +Htg5+ydbmWkTspvbMuHx/280Ow0JPSSXFnwWGWpyH7kI0EAfq75W3iGMRR6yL7Je +ffZG7W8KARYx824nRlxbIN2rHo9VQwEBkbmoeg5nSkvi +-----END CERTIFICATE-----`, + CAMUNDA_SECURE_CONNECTION: true, + zeebeGrpcSettings: { + ZEEBE_CLIENT_LOG_LEVEL: 'NONE', + }, + }, + }) + zbc.topology().then(() => { + expect(true).toBe(true) + zbc.close() + // Stop the server after the test + server.tryShutdown((err) => { + if (err) console.error(err) + done() + }) + }) } ) }) diff --git a/src/lib/Configuration.ts b/src/lib/Configuration.ts index cc43e2f2..0b8bcce6 100644 --- a/src/lib/Configuration.ts +++ b/src/lib/Configuration.ts @@ -111,6 +111,11 @@ const getMainEnv = () => type: 'string', optional: true, }, + /** When using self-signed certificates, provide the root certificate as a string */ + CAMUNDA_CUSTOM_ROOT_CERT_STRING: { + type: 'string', + optional: true, + }, /** When using custom or self-signed certificates, provide the path to the certificate chain */ CAMUNDA_CUSTOM_CERT_CHAIN_PATH: { type: 'string', diff --git a/src/lib/GetCustomCertificateBuffer.ts b/src/lib/GetCustomCertificateBuffer.ts index 75db14a2..8c6ca584 100644 --- a/src/lib/GetCustomCertificateBuffer.ts +++ b/src/lib/GetCustomCertificateBuffer.ts @@ -9,8 +9,9 @@ export async function GetCustomCertificateBuffer( config: CamundaPlatform8Configuration ): Promise { const customRootCertPath = config.CAMUNDA_CUSTOM_ROOT_CERT_PATH + const customRootCert = config.CAMUNDA_CUSTOM_ROOT_CERT_STRING - if (!customRootCertPath) { + if (!customRootCertPath && !customRootCert) { return undefined } const rootCerts: string[] = [] @@ -21,6 +22,8 @@ export async function GetCustomCertificateBuffer( if (cert) { rootCerts.push(cert) } + } else if (customRootCert) { + rootCerts.push(customRootCert) } // (2) use certificates from OS keychain diff --git a/src/zeebe/zb/ZeebeGrpcClient.ts b/src/zeebe/zb/ZeebeGrpcClient.ts index 1bc39bf9..59bccbd5 100644 --- a/src/zeebe/zb/ZeebeGrpcClient.ts +++ b/src/zeebe/zb/ZeebeGrpcClient.ts @@ -11,6 +11,7 @@ import { CamundaEnvironmentConfigurator, CamundaPlatform8Configuration, DeepPartial, + GetCustomCertificateBuffer, LosslessDto, RequireConfiguration, constructOAuthProvider, @@ -65,18 +66,18 @@ const idColors = [ export class ZeebeGrpcClient extends TypedEmitter< typeof ConnectionStatusEvent > { - public connectionTolerance: MaybeTimeDuration + public connectionTolerance!: MaybeTimeDuration public connected?: boolean = undefined public readied = false public gatewayAddress: string public loglevel: Loglevel public onReady?: () => void public onConnectionError?: (err: Error) => void - private logger: StatefulLogInterceptor + private logger!: StatefulLogInterceptor private closePromise?: Promise private closing = false // A gRPC channel for the ZBClient to execute commands on - private grpc: ZB.ZBGrpc + private grpc: Promise private options: ZBClientOptions private workerCount = 0 private workers: ZBWorker[] = // eslint-disable-line @typescript-eslint/no-explicit-any @@ -136,19 +137,6 @@ export class ZeebeGrpcClient extends TypedEmitter< debug('Gateway address: ', this.gatewayAddress) - this.useTLS = config.CAMUNDA_SECURE_CONNECTION - const certChainPath = config.CAMUNDA_CUSTOM_CERT_CHAIN_PATH - const rootCertsPath = config.CAMUNDA_CUSTOM_ROOT_CERT_PATH - const privateKeyPath = config.CAMUNDA_CUSTOM_PRIVATE_KEY_PATH - const customSSL = { - certChain: certChainPath ? readFileSync(certChainPath) : undefined, - privateKey: privateKeyPath ? readFileSync(privateKeyPath) : undefined, - rootCerts: rootCertsPath ? readFileSync(rootCertsPath) : undefined, - } - - this.customSSL = customSSL - this.options.customSSL = customSSL - this.connectionTolerance = Duration.milliseconds.of( config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_CONNECTION_TOLERANCE_MS ) @@ -158,63 +146,77 @@ export class ZeebeGrpcClient extends TypedEmitter< this.oAuthProvider = options?.oAuthProvider ?? constructOAuthProvider(config) - const { grpcClient, log } = this.constructGrpcClient({ - grpcConfig: { - namespace: this.options.logNamespace || 'ZBClient', - }, - logConfig: { - _tag: 'ZBCLIENT', - loglevel: this.loglevel, - longPoll: Duration.milliseconds.from(this.options.longPoll), - namespace: this.options.logNamespace || 'ZBClient', - pollInterval: Duration.milliseconds.from(this.options.pollInterval), - stdout: this.stdout, - }, - }) - - grpcClient.on(ConnectionStatusEvent.connectionError, (err: Error) => { - if (this.connected !== false) { - this.onConnectionError?.(err) - this.emit(ConnectionStatusEvent.connectionError) - } - this.connected = false - this.readied = false - }) - grpcClient.on(ConnectionStatusEvent.ready, () => { - if (!this.readied) { - this.onReady?.() - this.emit(ConnectionStatusEvent.ready) - } - this.connected = true - this.readied = true - }) - this.grpc = grpcClient - this.logger = log - this.maxRetries = config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_MAX_RETRIES this.maxRetryTimeout = Duration.seconds.of( config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_MAX_RETRY_TIMEOUT_SECONDS ) + this.useTLS = config.CAMUNDA_SECURE_CONNECTION + const certChainPath = config.CAMUNDA_CUSTOM_CERT_CHAIN_PATH + const privateKeyPath = config.CAMUNDA_CUSTOM_PRIVATE_KEY_PATH - // Send command to broker to eagerly fail / prove connection. - // This is useful for, for example: the Node-Red client, which wants to - // display the connection status. - const eagerConnection = - config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_EAGER_CONNECT - if (eagerConnection ?? false) { - this.topology() - .then((res) => { - this.logger.logDirect(chalk.blueBright('Zeebe cluster topology:')) - this.logger.logDirect(res.brokers) - }) - .catch((e) => { - // Swallow exception to avoid throwing if retries are off - if (e.thisWillNeverHappenYo) { - this.emit(ConnectionStatusEvent.unknown) - } - }) - } + this.grpc = GetCustomCertificateBuffer(config).then((rootCerts) => { + const customSSL = { + certChain: certChainPath ? readFileSync(certChainPath) : undefined, + privateKey: privateKeyPath ? readFileSync(privateKeyPath) : undefined, + rootCerts, + } + + this.customSSL = customSSL + this.options.customSSL = customSSL + + const { grpcClient, log } = this.constructGrpcClient({ + grpcConfig: { + namespace: this.options.logNamespace || 'ZBClient', + }, + logConfig: { + _tag: 'ZBCLIENT', + loglevel: this.loglevel, + longPoll: Duration.milliseconds.from(this.options.longPoll), + namespace: this.options.logNamespace || 'ZBClient', + pollInterval: Duration.milliseconds.from(this.options.pollInterval), + stdout: this.stdout, + }, + }) + + grpcClient.on(ConnectionStatusEvent.connectionError, (err: Error) => { + if (this.connected !== false) { + this.onConnectionError?.(err) + this.emit(ConnectionStatusEvent.connectionError) + } + this.connected = false + this.readied = false + }) + grpcClient.on(ConnectionStatusEvent.ready, () => { + if (!this.readied) { + this.onReady?.() + this.emit(ConnectionStatusEvent.ready) + } + this.connected = true + this.readied = true + }) + this.logger = log + + // Send command to broker to eagerly fail / prove connection. + // This is useful for, for example: the Node-Red client, which wants to + // display the connection status. + const eagerConnection = + config.zeebeGrpcSettings.ZEEBE_GRPC_CLIENT_EAGER_CONNECT + if (eagerConnection ?? false) { + this.topology() + .then((res) => { + this.logger.logDirect(chalk.blueBright('Zeebe cluster topology:')) + this.logger.logDirect(res.brokers) + }) + .catch((e) => { + // Swallow exception to avoid throwing if retries are off + if (e.thisWillNeverHappenYo) { + this.emit(ConnectionStatusEvent.unknown) + } + }) + } + return grpcClient + }) } /** @@ -268,7 +270,7 @@ export class ZeebeGrpcClient extends TypedEmitter< // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { try { - const stream = await this.grpc.activateJobsStream(req) + const stream = await (await this.grpc).activateJobsStream(req) stream.on('data', (res: Grpc.ActivateJobsResponse) => { const jobs = res.jobs.map((job) => parseVariablesAndCustomHeadersToJSON( @@ -306,8 +308,8 @@ export class ZeebeGrpcClient extends TypedEmitter< variables: JSON.stringify(req.variables ?? {}), tenantId: req.tenantId ?? this.tenantId, } - return this.executeOperation('broadcastSignal', () => - this.grpc.broadcastSignalSync(request) + return this.executeOperation('broadcastSignal', async () => + (await this.grpc).broadcastSignalSync(request) ) } @@ -328,8 +330,8 @@ export class ZeebeGrpcClient extends TypedEmitter< processInstanceKey: string | number ): Promise { Utils.validateNumber(processInstanceKey, 'processInstanceKey') - return this.executeOperation('cancelProcessInstance', () => - this.grpc.cancelProcessInstanceSync({ + return this.executeOperation('cancelProcessInstance', async () => + (await this.grpc).cancelProcessInstanceSync({ processInstanceKey, }) ) @@ -466,10 +468,10 @@ export class ZeebeGrpcClient extends TypedEmitter< // Prevent the creation of more workers this.closing = true Promise.all(this.workers.map((w) => w.close(timeout))) - .then(() => this.grpc.close(timeout)) - .then(() => { + .then(async () => (await this.grpc).close(timeout)) + .then(async () => { this.emit(ConnectionStatusEvent.close) - this.grpc.removeAllListeners() + ;(await this.grpc).removeAllListeners() this.removeAllListeners() resolve(null) }) @@ -505,8 +507,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ): Promise { const withStringifiedVariables = stringifyVariables(completeJobRequest) this.logger.logDebug(withStringifiedVariables) - return this.executeOperation('completeJob', () => - this.grpc.completeJobSync(withStringifiedVariables).catch((e) => { + return this.executeOperation('completeJob', async () => + (await this.grpc).completeJobSync(withStringifiedVariables).catch((e) => { if (e.code === GrpcError.NOT_FOUND) { e.details += '. The process may have been cancelled, the job cancelled by an interrupting event, or the job already completed.' + @@ -559,8 +561,8 @@ export class ZeebeGrpcClient extends TypedEmitter< tenantId: config.tenantId ?? this.tenantId, }) - return this.executeOperation('createProcessInstance', () => - this.grpc.createProcessInstanceSync(createProcessInstanceRequest) + return this.executeOperation('createProcessInstance', async () => + (await this.grpc).createProcessInstanceSync(createProcessInstanceRequest) ) } @@ -604,8 +606,8 @@ export class ZeebeGrpcClient extends TypedEmitter< tenantId: request.tenantId ?? this.tenantId, }) - return this.executeOperation('createProcessInstanceWithResult', () => - this.grpc.createProcessInstanceWithResultSync({ + return this.executeOperation('createProcessInstanceWithResult', async () => + (await this.grpc).createProcessInstanceWithResultSync({ fetchVariables: request.fetchVariables, request: createProcessInstanceRequest, requestTimeout: request.requestTimeout, @@ -628,8 +630,8 @@ export class ZeebeGrpcClient extends TypedEmitter< }: { resourceKey: string }): Promise> { - return this.executeOperation('deleteResourceSync', () => - this.grpc.deleteResourceSync({ resourceKey }) + return this.executeOperation('deleteResourceSync', async () => + (await this.grpc).deleteResourceSync({ resourceKey }) ) } @@ -718,8 +720,8 @@ export class ZeebeGrpcClient extends TypedEmitter< if (isProcessFilename(resource)) { const filename = resource.processFilename const process = readFileSync(filename) - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: filename, @@ -730,8 +732,8 @@ export class ZeebeGrpcClient extends TypedEmitter< }) ) } else if (isProcess(resource)) { - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: resource.name, @@ -744,8 +746,8 @@ export class ZeebeGrpcClient extends TypedEmitter< } else if (isDecisionFilename(resource)) { const filename = resource.decisionFilename const decision = readFileSync(filename) - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: filename, @@ -756,8 +758,8 @@ export class ZeebeGrpcClient extends TypedEmitter< }) ) } else if (isDecision(resource)) { - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: resource.name, @@ -770,8 +772,8 @@ export class ZeebeGrpcClient extends TypedEmitter< } else if (isFormFilename(resource)) { const filename = resource.formFilename const form = readFileSync(filename) - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: filename, @@ -783,8 +785,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ) } /* if (isForm(resource)) */ else { // default fall-through - return this.executeOperation('deployResource', () => - this.grpc.deployResourceSync({ + return this.executeOperation('deployResource', async () => + (await this.grpc).deployResourceSync({ resources: [ { name: resource.name, @@ -815,8 +817,8 @@ export class ZeebeGrpcClient extends TypedEmitter< const variables = losslessStringify( evaluateDecisionRequest.variables ) as unknown as ZB.JSONDoc - return this.executeOperation('evaluateDecision', () => - this.grpc.evaluateDecisionSync({ + return this.executeOperation('evaluateDecision', async () => + (await this.grpc).evaluateDecisionSync({ ...evaluateDecisionRequest, variables, tenantId: evaluateDecisionRequest.tenantId ?? this.tenantId, @@ -841,8 +843,8 @@ export class ZeebeGrpcClient extends TypedEmitter< * ``` */ public failJob(failJobRequest: Grpc.FailJobRequest): Promise { - return this.executeOperation('failJob', () => - this.grpc.failJobSync(failJobRequest) + return this.executeOperation('failJob', async () => + (await this.grpc).failJobSync(failJobRequest) ) } @@ -885,7 +887,7 @@ export class ZeebeGrpcClient extends TypedEmitter< public modifyProcessInstance( modifyProcessInstanceRequest: Grpc.ModifyProcessInstanceRequest ): Promise { - return this.executeOperation('modifyProcessInstance', () => { + return this.executeOperation('modifyProcessInstance', async () => { // We accept JSONDoc for the variableInstructions, but the actual gRPC call needs stringified JSON, so transform it with a mutation const req = Utils.deepClone(modifyProcessInstanceRequest) req?.activateInstructions?.forEach((a) => @@ -893,7 +895,7 @@ export class ZeebeGrpcClient extends TypedEmitter< (v) => (v.variables = losslessStringify(v.variables)) ) ) - return this.grpc.modifyProcessInstanceSync({ + return (await this.grpc).modifyProcessInstanceSync({ ...req, }) }) @@ -906,8 +908,10 @@ export class ZeebeGrpcClient extends TypedEmitter< public migrateProcessInstance( migrateProcessInstanceRequest: Grpc.MigrateProcessInstanceRequest ): Promise { - return this.executeOperation('migrateProcessInstance', () => - this.grpc.migrateProcessInstanceSync(migrateProcessInstanceRequest) + return this.executeOperation('migrateProcessInstance', async () => + (await this.grpc).migrateProcessInstanceSync( + migrateProcessInstanceRequest + ) ) } @@ -934,8 +938,8 @@ export class ZeebeGrpcClient extends TypedEmitter< >( publishMessageRequest: Grpc.PublishMessageRequest ): Promise { - return this.executeOperation('publishMessage', () => - this.grpc.publishMessageSync( + return this.executeOperation('publishMessage', async () => + (await this.grpc).publishMessageSync( stringifyVariables({ ...publishMessageRequest, variables: publishMessageRequest.variables, @@ -997,8 +1001,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ...publishStartMessageRequest, tenantId: publishStartMessageRequest.tenantId ?? this.tenantId, } - return this.executeOperation('publishStartMessage', () => - this.grpc.publishMessageSync( + return this.executeOperation('publishStartMessage', async () => + (await this.grpc).publishMessageSync( stringifyVariables({ ...publishMessageRequest, variables: publishMessageRequest.variables || {}, @@ -1041,8 +1045,8 @@ export class ZeebeGrpcClient extends TypedEmitter< public resolveIncident( resolveIncidentRequest: Grpc.ResolveIncidentRequest ): Promise { - return this.executeOperation('resolveIncident', () => - this.grpc.resolveIncidentSync(resolveIncidentRequest) + return this.executeOperation('resolveIncident', async () => + (await this.grpc).resolveIncidentSync(resolveIncidentRequest) ) } @@ -1092,8 +1096,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ? losslessStringify(request.variables) : request.variables - return this.executeOperation('setVariables', () => - this.grpc.setVariablesSync({ ...request, variables }) + return this.executeOperation('setVariables', async () => + (await this.grpc).setVariablesSync({ ...request, variables }) ) } @@ -1146,8 +1150,8 @@ export class ZeebeGrpcClient extends TypedEmitter< ...throwErrorRequest, variables: throwErrorRequest.variables ?? {}, }) - return this.executeOperation('throwError', () => - this.grpc.throwErrorSync(req) + return this.executeOperation('throwError', async () => + (await this.grpc).throwErrorSync(req) ) } @@ -1160,8 +1164,8 @@ export class ZeebeGrpcClient extends TypedEmitter< * zbc.topology().then(res => console.res(JSON.stringify(res, null, 2))) * ``` */ - public topology(): Promise { - return this.executeOperation('topology', this.grpc.topologySync) + public async topology(): Promise { + return this.executeOperation('topology', (await this.grpc).topologySync) } /** @@ -1201,8 +1205,8 @@ export class ZeebeGrpcClient extends TypedEmitter< public updateJobRetries( updateJobRetriesRequest: Grpc.UpdateJobRetriesRequest ): Promise { - return this.executeOperation('updateJobRetries', () => - this.grpc.updateJobRetriesSync(updateJobRetriesRequest) + return this.executeOperation('updateJobRetries', async () => + (await this.grpc).updateJobRetriesSync(updateJobRetriesRequest) ) } @@ -1306,8 +1310,8 @@ export class ZeebeGrpcClient extends TypedEmitter< let connectionErrorCount = 0 let authFailures = 0 return promiseRetry( - (retry, n) => { - if (this.closing || this.grpc.channelClosed) { + async (retry, n) => { + if (this.closing || (await this.grpc).channelClosed) { /** * Should we reject instead? The idea here is that calling ZBClient.close() will allow the application to cleanly shut down. * If we reject here, any pending calls will throw errors. This is probably not what the user is expecting to see. @@ -1319,7 +1323,7 @@ export class ZeebeGrpcClient extends TypedEmitter< `[${operationName}]: Attempt ${n} (max: ${this.maxRetries}).` ) } - return operation().catch((err) => { + return operation().catch(async (err) => { // This could be DNS resolution, or the gRPC gateway is not reachable yet, or Backpressure const isNetworkError = (err.message.indexOf('14') === 0 || @@ -1351,7 +1355,7 @@ export class ZeebeGrpcClient extends TypedEmitter< } // The gRPC channel will be closed if close has been called - if (this.grpc.channelClosed) { + if ((await this.grpc).channelClosed) { return Promise.resolve(null as unknown as T) } throw err