Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka importer as option #638

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,14 @@ Available commit types:

### Zeebe cluster backend for local development

There's a file ```docker/docker-compose.yml``` prepared in this repo, which can be used with recent Docker version to
provide a backend. You just need to alter this file and comment out the ```monitor``` service. What remains is
the ```zeebe``` service, which then can be started via ```docker compose up``` command.
There's a file [docker-compose.yml](docker/docker-compose.yml) prepared in this repo, which can be used with recent Docker version to
provide a backend. You just need to choose some profiles and specify them in a file [.env](docker/.env) using pattern `COMPOSE_PROFILES=profile1,profile2`:
* ```hazelcast``` runs Zeebe broker with Hazelcast exporter
* ```kafka``` runs Zeebe broker with Kafka exporter
* ```postgres``` runs PostgreSQL database
* ```mysql``` runs MySQL database

Then run ```docker compose up``` command.

With such a backend running, you can simply start debugging the ```ZeebeSimpleMonitorApp``` class in your IDE of choice.

Expand Down
83 changes: 77 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A monitoring application for [Zeebe](https://zeebe.io). It is designed for devel
* test workflows manually
* provide insides on how workflows are executed

The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter). It aggregates the data and stores it into a (in-memory) database. The data is displayed on server-side rendered HTML pages.
The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter) or [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages.

![how-it-works](docs/how-it-works.png)

Expand All @@ -41,23 +41,54 @@ docker pull ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1
* a) in Zeebe broker, set the environment variable `ZEEBE_HAZELCAST_CLUSTER_NAME=dev` (default: `dev`)
* b) in Zeebe Simple Monitor, change the setting `zeebe.client.worker.hazelcast.clusterName` (default: `dev`)

**Switch to the Kafka exporter/importer**

By default, the Zeebe Simple Monitor imports Zeebe events through Hazelcast, but you can switch to Kafka.

* Ensure that a Zeebe broker is running with a [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter) (>= `3.1.1`)
* Configure the environment variables in the Zeebe broker:
* Add spring configuration for the [zeebe-kafka-exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter): `SPRING_CONFIG_ADDITIONAL_LOCATION: /usr/local/zeebe/config/exporter.yml`. [Example](docker/kafka/exporter.yml) and [details](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#configuration)
* Inject `exporter.yml` and `zeebe-kafka-exporter.jar` into the Docker container, for example, using Docker Compose:
```
volumes:
- ./exporter.yml:/usr/local/zeebe/config/exporter.yml
- ./zeebe-kafka-exporter-3.1.1-jar-with-dependencies.jar:/usr/local/zeebe/lib/zeebe-kafka-exporter.jar
```
* Set the Kafka internal host: `KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"`
* Set the Kafka topic: `KAFKA_TOPIC: zeebe`
* In order to import events efficiently and quickly, Zeebe brokers partitions and Kafka topic partitions should be correlated in a special way: [reference to the exporter docs](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#partitioning)
* Configure the environment variables in the Zeebe Simple Monitor as described in the "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)" section


If the Zeebe broker runs on your local machine with the default configs then start the container with the following command:

```
docker run --network="host" ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1
```

For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast exporter and the application.
For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka exporter and the application.
There are several Docker Compose profiles, setting by a file [.env](docker/.env), by passing multiple --profile flags or a comma-separated list for the COMPOSE_PROFILES environment variable:
* ```docker compose --profile hazelcast --profile hazelcast_in_memory up```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory docker compose up```

Existing presets:
* ```COMPOSE_PROFILES=hazelcast,hazelcast_in_memory``` (by default)
* ```COMPOSE_PROFILES=kafka,kafka_in_memory```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_postgres,postgres```
* ```COMPOSE_PROFILES=hazelcast,hazelcast_mysql,mysql```

The commands to build and run:
```
mvn clean install -DskipTests
cd docker
docker-compose --profile in-memory up
docker-compose up
```

Go to http://localhost:8082

To use PostgreSQL instead of the in-memory database, use the profile `postgres`.
To change the database see "[Change the Database](#change-the-database)"

To change Zeebe importer see "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)"

```
docker-compose --profile postgres up
Expand Down Expand Up @@ -94,6 +125,12 @@ zeebe:
clusterName: dev
connectionTimeout: PT30S

# Options: hazelcast | kafka
# This config switches importers between the provided
# To use each of them, zeebe must be configured using hazelcast-exporter or kafka-exporter, respectively
# See the examples in docker/docker-compose.yml in services.zeebe and services.zeebe-kafka
zeebe-importer: hazelcast

spring:

datasource:
Expand All @@ -107,6 +144,29 @@ spring:
hibernate:
ddl-auto: update

kafka:
template:
default-topic: zeebe
bootstrap-servers: localhost:9093
properties:
request.timeout.ms: 20000
retry.backoff.ms: 500
group-id: zeebe-simple-monitor
consumer:
auto-offset-reset: earliest
properties:
# 1Mb (1*1024*1024), max size of batch
max.partition.fetch.bytes: 1048576
# Number of messages in batch received by kafka listener.
# Works only if their size is less than 'max.partition.fetch.bytes'
max.poll.records: 1000
custom:
# Set equal to number of topic partitions to handle them in parallel
concurrency: 3
retry:
intervalMs: 30000
max-attempts: 3

server:
port: 8082
servlet:
Expand Down Expand Up @@ -166,7 +226,7 @@ For example, using PostgreSQL:

* the PostgreSQL database driver is already bundled

See the [docker-compose file](docker/docker-compose.yml) (profile: `postgres`) for a sample configuration with PostgreSQL.
See the [docker-compose file](docker/docker-compose.yml) for a sample configuration with PostgreSQL. Profiles presets: `hazelcast,hazelcast_postgres,postgres`

The configuration for using MySql is similar but with an additional setting for the Hibernate naming strategy:

Expand All @@ -181,7 +241,18 @@ The configuration for using MySql is similar but with an additional setting for

* the MySql database driver is already bundled

See the [docker-compose file](docker/docker-compose.yml) (profile: `mysql`) for a sample configuration with MySql.
See the [docker-compose file](docker/docker-compose.yml) for a sample configuration with MySql. Profiles presets: `hazelcast,hazelcast_mysql,mysql`

#### Change the default Zeebe importer to Kafka

* set the `zeebe-importer` (default: `hazelcast`) configuration property to `kafka`
* configure the connection to Kafka by setting `spring.kafka.bootstrap-servers` (default: `localhost:9093`)
* configure the Kafka topic by setting `spring.kafka.template.default-topic` (default: `zeebe`)
* configure custom Kafka properties if necessary:
* `spring.kafka.custom.concurrency` (default: `3`) is the number of threads for the Kafka listener that will import events from Zeebe
* `spring.kafka.custom.retry.intervalMs` (default: `30000`) and `spring.kafka.custom.retry.max-attempts` (default: `3`) are the retry configurations for a retryable exception in the listener

Refer to the [docker-compose file](docker/docker-compose.yml) for a sample configuration with the Kafka importer. Profiles presets: `kafka,kafka_in_memory`

## Code of Conduct

Expand Down
1 change: 1 addition & 0 deletions docker/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COMPOSE_PROFILES=hazelcast,hazelcast_in_memory
80 changes: 76 additions & 4 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ networks:

services:
zeebe:
container_name: zeebe_broker
container_name: zeebe-broker-hazelcast
image: ghcr.io/camunda-community-hub/zeebe-with-hazelcast-exporter:8.2.0
environment:
- ZEEBE_LOG_LEVEL=debug
Expand All @@ -17,6 +17,61 @@ services:
- "5701:5701"
networks:
- zeebe_network
volumes:
- ./hazelcast/application.yaml:/usr/local/zeebe/config/application.yaml
profiles:
- hazelcast

zeebe-kafka:
container_name: zeebe-broker-kafka
image: camunda/zeebe:8.3.4
hostname: zeebe
environment:
ZEEBE_BROKER_CLUSTER_PARTITIONSCOUNT: 5
SPRING_CONFIG_ADDITIONAL_LOCATION: /usr/local/zeebe/config/exporter.yml
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
KAFKA_TOPIC: zeebe
depends_on:
- kafka
ports:
- "26500:26500"
networks:
- zeebe_network
volumes:
- ./kafka/exporter.yml:/usr/local/zeebe/config/exporter.yml
- ./kafka/exporter/zeebe-kafka-exporter-3.1.1-jar-with-dependencies.jar:/usr/local/zeebe/lib/zeebe-kafka-exporter.jar
profiles:
- kafka

zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- zeebe_network
profiles:
- kafka

kafka:
image: docker.io/bitnami/kafka:3.4
ports:
- "9093:9093"
environment:
- KAFKA_ENABLE_KRAFT=no
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
networks:
- zeebe_network
profiles:
- kafka

simple-monitor-in-memory:
container_name: zeebe-simple-monitor-in-memory
Expand All @@ -32,7 +87,24 @@ services:
networks:
- zeebe_network
profiles:
- in-memory
- hazelcast_in_memory

simple-monitor-in-memory-kafka:
container_name: zeebe-simple-monitor-in-memory-kafka
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
environment:
- zeebe.client.broker.gateway-address=zeebe:26500
- zeebe-importer=kafka
- spring.kafka.bootstrap-servers=kafka:9092
- spring.kafka.template.default-topic=zeebe
ports:
- "8082:8082"
depends_on:
- zeebe-kafka
networks:
- zeebe_network
profiles:
- kafka_in_memory

simple-monitor-postgres:
container_name: zeebe-simple-monitor-postgres
Expand All @@ -54,7 +126,7 @@ services:
networks:
- zeebe_network
profiles:
- postgres
- hazelcast_postgres

postgres-zeebe-simple-monitor:
image: postgres:16.1
Expand Down Expand Up @@ -89,7 +161,7 @@ services:
networks:
- zeebe_network
profiles:
- mysql
- hazelcast_mysql

mysql-zeebe-simple-monitor:
image: mysql:8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ zeebe:
exporters:
hazelcast:
className: io.zeebe.hazelcast.exporter.HazelcastExporter
jarPath: exporters/zeebe-hazelcast-exporter.jar
jarPath: exporters/zeebe-hazelcast-exporter-jar-with-dependencies.jar
115 changes: 115 additions & 0 deletions docker/kafka/exporter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
zeebe:
broker:
exporters:
kafka:
className: io.zeebe.exporters.kafka.KafkaExporter
# Update this path to the location of the JAR
# Note that this must be visible to the broker process
jarPath: lib/zeebe-kafka-exporter.jar
args:
# Controls the number of records to buffer in a single record batch before forcing a flush. Note
# that a flush may occur before anyway due to periodic flushing. This setting should help you
# estimate a soft upper bound to the memory consumption of the exporter. If you assume a worst
# case scenario where every record is the size of your zeebe.broker.network.maxMessageSize, then
# the memory required by the exporter would be at least:
# (maxBatchSize * zeebe.broker.network.maxMessageSize * 2)
#
# We multiply by 2 as the records are buffered twice - once in the exporter itself, and once
# in the producer's network buffers (but serialized at that point). There's some additional
# memory overhead used by the producer as well for compression/encryption/etc., so you have to
# add a bit, but that one is not proportional to the number of records and is more or less
# constant.
#
# Once the batch has reached this size, a flush is automatically triggered. Too small a number
# here would cause many flush, which is not good for performance, but would mean you will see
# your records faster/sooner.
#
# Default is 100
maxBatchSize: 100
# The maximum time to block when the batch is full. If the batch is full, and a new
# record comes in, the exporter will block until there is space in the batch, or until
# maxBlockingTimeoutMs milliseconds elapse.
maxBlockingTimeoutMs: 1000
# How often should pending batches be flushed to the Kafka broker. Too low a value will
# cause more load on the broker, but means your records will be visible faster.
flushIntervalMs: 1000

# Producer specific configuration
producer:
# The list of initial Kafka broker contact points. The format should be the same
# one as the ProducerConfig expects, i.e. "host:port"
# Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
# For example:
# servers: "kafka:9092,localhost:29092"
servers: ${KAFKA_BOOTSTRAP_SERVERS}
# Controls how long the producer will wait for a request to be acknowledged by
# the Kafka broker before retrying it
# Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
requestTimeoutMs: 5000
# Grace period when shutting down the producer in milliseconds
closeTimeoutMs: 5000
# Producer client identifier
clientId: zeebe

# Any setting under the following section will be passed verbatim to
# ProducerConfig; you can use this to configure authentication, compression,
# etc. Note that you can overwrite some important settings, so avoid changing
# idempotency, delivery timeout, and retries, unless you know what you're doing
config: |
linger.ms=5
buffer.memory=8388608
batch.size=32768
max.block.ms=5000

# Controls which records are pushed to Kafka and to which topic
# Each entry is a sub-map which can contain two entries:
# type => string
# topic => string
#
# Topic is the topic to which the record with the given value type
# should be sent to, e.g. for a deployment record below we would
# send the record to "zeebe-deployment" topic.
#
# Type is a comma separated string of accepted record types, allowing you to filter if you
# want nothing (""), commands ("command"), events ("events"), or rejections ("rejection"),
# or a combination of the three, e.g. "command,event".
#
# To omit certain records entirely, set type to an empty string. For example,
# records:
# deployment: { type: "" }
records:
# If a record value type is omitted in your configuration file,
# it will fall back to whatever is configured in the defaults
defaults: { type: "event", topic: '${KAFKA_TOPIC}'}
# For records with a value of type DEPLOYMENT
# deployment: { topic: zeebe-deployment }
# For records with a value of type ERROR
# error: { topic: zeebe-error }
# For records with a value of type INCIDENT
# incident: { topic: zeebe-incident }
# For records with a value of type JOB
# job: { topic: zeebe-job }
# For records with a value of type MESSAGE
# message: { topic: zeebe-message }
# For records with a value of type MESSAGE_SUBSCRIPTION
# messageSubscription: { topic: zeebe-message-subscription }
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
# messageStartEventSubscription: { topic: zeebe-message-subscription-start-event }
# For records with a value of type PROCESS
# process: { topic: zeebe-process }
# For records with a value of type PROCESS_INSTANCE
# processInstance: { topic: zeebe-process-instance }
# For records with a value of type TIMER
# timer: { topic: zeebe-timer }
# For records with a value of type VARIABLE
# variable: { topic: zeebe-variable }
# For records with a value of type DEPLOYMENT_DISTRIBUTION
deploymentDistribution: { topic: "" }
# For records with a value of type JOB_BATCH
jobBatch: { type: "" }
# For records with a value of type PROCESS_EVENT
processEvent: { type: "" }
# For records with a value of type PROCESS_INSTANCE_RESULT
processInstanceResult: { type: "" }
# For records with a value of type PROCESS_MESSAGE_SUBSCRIPTION
processMessageSubscription: { type: "" }
Binary file not shown.
Binary file modified docs/how-it-works.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading