diff --git a/grpc-client-generation-python/README.md b/grpc-client-generation-python/README.md new file mode 100644 index 00000000..4d354787 --- /dev/null +++ b/grpc-client-generation-python/README.md @@ -0,0 +1,50 @@ +## Client Generation with Protocol Buffers + +gRPC is a high-performance, open-source universal RPC (Remote Procedure Call) framework that uses protocol buffers as its interface definition language. One of its powerful features is the automatic generation of client and server code from .proto files, which define the service methods and message types. This process simplifies the development of gRPC services and clients, making it easier to build distributed applications and microservices. Here's how gRPC allows client (and server) code generation based on .proto files: + +## 1. Retrieve Service and Messages in .proto File + +For 8.4 the .proto file can be retrieved here: + +``` +curl -O https://raw.githubusercontent.com/camunda/zeebe/stable/8.4/gateway-protocol/src/main/proto/gateway.proto +``` + +## 2. Install Required Tools +To generate Python code, you need the Protocol Buffer compiler (protoc) and the Python gRPC plugin. If you haven't installed these, you can do so as follows: + +Install protoc from the [official releases](https://grpc.io/docs/protoc-installation/) page or via a package manager for your system. + +``` +apt install -y protobuf-compiler +``` + +Install the Python gRPC tools using pip: + +``` +pip install grpcio-tools +``` + +## 3. Generate Python gRPC Code + +``` +python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. gateway.proto +``` + +After running this command, you should see two new files in your directory: + +- `gateway_pb2.py`: Contains the generated request and response classes. +- `gateway_pb2_grpc.py`: Contains the generated client and server classes. + + +### 4. Implement OAuth Interceptor + +The interceptor is required to seamlessly inject authentication tokens into all outgoing gRPC requests, ensuring secure communication with the Zeebe broker without manually adding tokens to each call. It works by intercepting each call, obtaining a fresh OAuth token if necessary, and appending it to the request's metadata as an Authorization header. + +Example Implementation [here](oauthinterceptor.py). + +### 5. Write Zeebe Client +Example Implementation [here](zeebe_client.py). + + + diff --git a/grpc-client-generation-python/example.bpmn b/grpc-client-generation-python/example.bpmn new file mode 100644 index 00000000..4c55bffe --- /dev/null +++ b/grpc-client-generation-python/example.bpmn @@ -0,0 +1,42 @@ + + + + + Flow_1ch9458 + + + + Flow_0c6v5vz + + + + + + + Flow_1ch9458 + Flow_0c6v5vz + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/grpc-client-generation-python/oauthinterceptor.py b/grpc-client-generation-python/oauthinterceptor.py new file mode 100644 index 00000000..8e5c3898 --- /dev/null +++ b/grpc-client-generation-python/oauthinterceptor.py @@ -0,0 +1,67 @@ +from collections import namedtuple +from grpc import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor +import requests +import time + +# Maintain the namedtuple definition for ClientCallDetails +ClientCallDetails = namedtuple('ClientCallDetails', ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression')) + +class OAuthInterceptor(UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, StreamStreamClientInterceptor): + def __init__(self, token_url, client_id, client_secret, audience): + self.token_url = token_url + self.client_id = client_id + self.client_secret = client_secret + self.audience = audience + self.token = None + self.token_expiry = None + + def get_access_token(self): + """Fetch the access token using client credentials.""" + if self.token and self.token_expiry > time.time(): + return self.token + + print("Retrieving new token...") + payload = { + 'grant_type': 'client_credentials', + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'audience': self.audience + } + response = requests.post(self.token_url, data=payload) + response_data = response.json() + self.token = response_data['access_token'] + self.token_expiry = time.time() + response_data['expires_in'] - 60 # 60 seconds leeway + + return self.token + + def update_metadata(self, client_call_details, token): + metadata = [('authorization', f'Bearer {token}')] + if client_call_details.metadata is not None: + metadata.extend(client_call_details.metadata) + # Return a new ClientCallDetails instance with updated metadata + return ClientCallDetails( + client_call_details.method, + client_call_details.timeout, + metadata, + client_call_details.credentials, + client_call_details.wait_for_ready, + client_call_details.compression + ) + + def intercept_call(self, continuation, client_call_details, request_or_iterator): + token = self.get_access_token() + new_call_details = self.update_metadata(client_call_details, token) + return continuation(new_call_details, request_or_iterator) + + # Implement the intercept method for each call type using intercept_call + def intercept_unary_unary(self, continuation, client_call_details, request): + return self.intercept_call(continuation, client_call_details, request) + + def intercept_unary_stream(self, continuation, client_call_details, request): + return self.intercept_call(continuation, client_call_details, request) + + def intercept_stream_unary(self, continuation, client_call_details, request_iterator): + return self.intercept_call(continuation, client_call_details, request_iterator) + + def intercept_stream_stream(self, continuation, client_call_details, request_iterator): + return self.intercept_call(continuation, client_call_details, request_iterator) diff --git a/grpc-client-generation-python/zeebe_client.py b/grpc-client-generation-python/zeebe_client.py new file mode 100644 index 00000000..9a1f4241 --- /dev/null +++ b/grpc-client-generation-python/zeebe_client.py @@ -0,0 +1,65 @@ +import grpc +import gateway_pb2 +import gateway_pb2_grpc +from oauthinterceptor import OAuthInterceptor + +def run(): + + # OAuth Interceptor Configuration + token_url = "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token" + client_id = "zeebe" + client_secret = "NFp6GKwftJ" + audience = "zeebe-api" + + # Create an instance of the OAuthInterceptor + oauth_interceptor = OAuthInterceptor(token_url, client_id, client_secret, audience) + + # Add interceptor to the channel + intercept_channel = grpc.intercept_channel( + grpc.insecure_channel('localhost:26500'), oauth_interceptor) + + # Now use the intercepted channel to create stubs + stub = gateway_pb2_grpc.GatewayStub(intercept_channel) + + topologyResponse = stub.Topology(gateway_pb2.TopologyRequest()) + print(topologyResponse) + + tenantIds = ['custom'] # tenantIds + fileName = "example.bpmn" + with open(fileName, 'rb') as file: + bpmn_content = file.read() + print(bpmn_content) + # Deploy Diagram + for tenantId in tenantIds: + resource = gateway_pb2.Resource(name=fileName, content=bpmn_content) + deployResult = stub.DeployResource(gateway_pb2.DeployResourceRequest(tenantId=tenantId , resources=[resource])) + print(deployResult) + + # Start Instances + for tenantId in tenantIds: + for x in range(20): + stub.CreateProcessInstance(gateway_pb2.CreateProcessInstanceRequest(tenantId = tenantId, bpmnProcessId="example", version=-1 )) + + # Job worker logic (Activate and complete jobs) + job_type = 'dummy' + worker = 'python-worker' + timeout = 10000 # in milliseconds + maxJobsToActivate=10 + request = gateway_pb2.ActivateJobsRequest(type=job_type, worker=worker, timeout=timeout, maxJobsToActivate=maxJobsToActivate, tenantIds=tenantIds) + + while True: + for activate_response in stub.ActivateJobs(request): + for job in activate_response.jobs: + print(f"Activated job {job.key}") + # Process the job here + + # Complete the job + complete_request = gateway_pb2.CompleteJobRequest(jobKey=job.key, variables='{}') + stub.CompleteJob(complete_request) + print(f"Completed job {job.key}") + print("Looking for jobs again...") + + + +if __name__ == '__main__': + run() \ No newline at end of file