Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kafka importer as option #638

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A monitoring application for [Zeebe](https://zeebe.io). It is designed for devel
* test workflows manually
* provide insides on how workflows are executed

The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter). It aggregates the data and stores it into a (in-memory) database. The data is displayed on server-side rendered HTML pages.
The application imports the data from Zeebe using the [Hazelcast exporter](https://github.com/camunda-community-hub/zeebe-hazelcast-exporter) or [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter). It aggregates the data and stores it into a database. The data is displayed on server-side rendered HTML pages.

![how-it-works](docs/how-it-works.png)

Expand All @@ -41,13 +41,32 @@ docker pull ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1
* a) in Zeebe broker, set the environment variable `ZEEBE_HAZELCAST_CLUSTER_NAME=dev` (default: `dev`)
* b) in Zeebe Simple Monitor, change the setting `zeebe.client.worker.hazelcast.clusterName` (default: `dev`)

**Switch to the Kafka exporter/importer**

By default, the Zeebe Simple Monitor imports Zeebe events through Hazelcast, but you can switch to Kafka.

* Ensure that a Zeebe broker is running with a [Kafka exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter) (>= `3.1.1`)
* Configure the environment variables in the Zeebe broker:
* Add spring configuration for the [zeebe-kafka-exporter](https://github.com/camunda-community-hub/zeebe-kafka-exporter): `SPRING_CONFIG_ADDITIONAL_LOCATION: /usr/local/zeebe/config/exporter.yml`. [Example](docker/kafka/exporter.yml) and [details](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#configuration)
* Inject `exporter.yml` and `zeebe-kafka-exporter.jar` into the Docker container, for example, using Docker Compose:
```
volumes:
- ./exporter.yml:/usr/local/zeebe/config/exporter.yml
- ./zeebe-kafka-exporter-3.1.1-jar-with-dependencies.jar:/usr/local/zeebe/lib/zeebe-kafka-exporter.jar
```
* Set the Kafka internal host: `KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"`
* Set the Kafka topic: `KAFKA_TOPIC: zeebe`
* In order to import events efficiently and quickly, Zeebe brokers partitions and Kafka topic partitions should be correlated in a special way: [reference to the exporter docs](https://github.com/camunda-community-hub/zeebe-kafka-exporter?tab=readme-ov-file#partitioning)
* Configure the environment variables in the Zeebe Simple Monitor as described in the "[Change the default Zeebe importer to Kafka](#change-the-default-zeebe-importer-to-kafka)" section


If the Zeebe broker runs on your local machine with the default configs then start the container with the following command:

```
docker run --network="host" ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.4.1
```

For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast exporter and the application.
For a local setup, the repository contains a [docker-compose file](docker/docker-compose.yml). It starts a Zeebe broker with the Hazelcast/Kafka exporter and the application.

```
mvn clean install -DskipTests
Expand Down Expand Up @@ -94,6 +113,12 @@ zeebe:
clusterName: dev
connectionTimeout: PT30S

# Options: hazelcast | kafka
# This config switches importers between the provided
# To use each of them, zeebe must be configured using hazelcast-exporter or kafka-exporter, respectively
# See the examples in docker/docker-compose.yml in services.zeebe and services.zeebe-kafka
zeebe-importer: hazelcast

spring:

datasource:
Expand All @@ -107,6 +132,29 @@ spring:
hibernate:
ddl-auto: update

kafka:
template:
default-topic: zeebe
bootstrap-servers: localhost:9093
properties:
request.timeout.ms: 20000
retry.backoff.ms: 500
group-id: zeebe-simple-monitor
consumer:
auto-offset-reset: earliest
properties:
# 1Mb (1*1024*1024), max size of batch
max.partition.fetch.bytes: 1048576
# Number of messages in batch received by kafka listener.
# Works only if their size is less than 'max.partition.fetch.bytes'
max.poll.records: 1000
custom:
# Set equal to number of topic partitions to handle them in parallel
concurrency: 3
retry:
intervalMs: 30000
max-attempts: 3

server:
port: 8082
servlet:
Expand Down Expand Up @@ -183,6 +231,17 @@ The configuration for using MySql is similar but with an additional setting for

See the [docker-compose file](docker/docker-compose.yml) (profile: `mysql`) for a sample configuration with MySql.

#### Change the default Zeebe importer to Kafka

* set the `zeebe-importer` (default: `hazelcast`) configuration property to `kafka`
* configure the connection to Kafka by setting `spring.kafka.bootstrap-servers` (default: `localhost:9093`)
* configure the Kafka topic by setting `spring.kafka.template.default-topic` (default: `zeebe`)
* configure custom Kafka properties if necessary:
* `spring.kafka.custom.concurrency` (default: `3`) is the number of threads for the Kafka listener that will import events from Zeebe
* `spring.kafka.custom.retry.intervalMs` (default: `30000`) and `spring.kafka.custom.retry.max-attempts` (default: `3`) are the retry configurations for a retryable exception in the listener

Refer to the [docker-compose file](docker/docker-compose.yml) (service: `simple-monitor-in-memory-kafka`) for a sample configuration with the Kafka importer.

## Code of Conduct

This project adheres to the Contributor Covenant [Code of
Expand Down
66 changes: 65 additions & 1 deletion docker/docker-compose.yml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I naively tried docker compose up it crashed on me, because multiple backends overlap/clash.
Also, firing up too many services will stress people, who don't have enough hardware resources.
While I like your idea of these prepared services defined, may I propose to introduce profiles.
E.g. there could be: hazelcast, kafka, postgres, mysql, and inmemory.

Which means, we could also add a small note in the CONTRIBUTION.md file,
saying something like: "For testing purpose, use docker profiles ... COMPOSE_PROFILES=inmemory,hazelcast docker compose up"

See also https://docs.docker.com/compose/profiles/

Copy link
Contributor Author

@gasymovrv gasymovrv Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Martin, it's a good idea, but I think there some troubles with multiple profiles. For example:
COMPOSE_PROFILES=in_memory,hazelcast docker compose up
will try to start all services that have "in_memory" OR "hazelcast" profile, because profiles works as "has any".
So each of these services matches:

  simple-monitor-in-memory:
    ...
    profiles: ["in_memory", "hazelcast"]

  simple-monitor-in-memory-kafka:
    ...
    profiles: ["in_memory", "kafka"]

  simple-monitor-postgres:
    ...
    profiles: ["postgres", "hazelcast"]

  simple-monitor-mysql:
    ...
    profiles: ["mysql", "hazelcast"]

The workaround is to use compound names for profiles and run docker-compose as follows: COMPOSE_PROFILES=hazelcast,hazelcast_postgres,postgres

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I see.
Thanks for pointing this out.

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ networks:

services:
zeebe:
container_name: zeebe_broker
container_name: zeebe-broker-hazelcast
image: ghcr.io/camunda-community-hub/zeebe-with-hazelcast-exporter:8.2.0
environment:
- ZEEBE_LOG_LEVEL=debug
Expand All @@ -17,6 +17,53 @@ services:
- "5701:5701"
networks:
- zeebe_network
volumes:
- ./hazelcast/application.yaml:/usr/local/zeebe/config/application.yaml

zeebe-kafka:
container_name: zeebe-broker-kafka
image: camunda/zeebe:8.3.4
hostname: zeebe
environment:
ZEEBE_BROKER_CLUSTER_PARTITIONSCOUNT: 5
SPRING_CONFIG_ADDITIONAL_LOCATION: /usr/local/zeebe/config/exporter.yml
KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
KAFKA_TOPIC: zeebe
depends_on:
- kafka
ports:
- "26500:26500"
networks:
- zeebe_network
volumes:
- ./kafka/exporter.yml:/usr/local/zeebe/config/exporter.yml
- ./kafka/exporter/zeebe-kafka-exporter-3.1.1-jar-with-dependencies.jar:/usr/local/zeebe/lib/zeebe-kafka-exporter.jar

zookeeper:
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- zeebe_network

kafka:
image: docker.io/bitnami/kafka:3.4
ports:
- "9093:9093"
environment:
- KAFKA_ENABLE_KRAFT=no
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
networks:
- zeebe_network

simple-monitor-in-memory:
container_name: zeebe-simple-monitor-in-memory
Expand All @@ -34,6 +81,23 @@ services:
profiles:
- in-memory

simple-monitor-in-memory-kafka:
container_name: zeebe-simple-monitor-in-memory-kafka
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
environment:
- zeebe.client.broker.gateway-address=zeebe:26500
- zeebe-importer=kafka
- spring.kafka.bootstrap-servers=kafka:9092
- spring.kafka.template.default-topic=zeebe
ports:
- "8082:8082"
depends_on:
- zeebe-kafka
networks:
- zeebe_network
profiles:
- in-memory

simple-monitor-postgres:
container_name: zeebe-simple-monitor-postgres
image: ghcr.io/camunda-community-hub/zeebe-simple-monitor:2.5.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ zeebe:
exporters:
hazelcast:
className: io.zeebe.hazelcast.exporter.HazelcastExporter
jarPath: exporters/zeebe-hazelcast-exporter.jar
jarPath: exporters/zeebe-hazelcast-exporter-jar-with-dependencies.jar
115 changes: 115 additions & 0 deletions docker/kafka/exporter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
zeebe:
broker:
exporters:
kafka:
className: io.zeebe.exporters.kafka.KafkaExporter
# Update this path to the location of the JAR
# Note that this must be visible to the broker process
jarPath: lib/zeebe-kafka-exporter.jar
args:
# Controls the number of records to buffer in a single record batch before forcing a flush. Note
# that a flush may occur before anyway due to periodic flushing. This setting should help you
# estimate a soft upper bound to the memory consumption of the exporter. If you assume a worst
# case scenario where every record is the size of your zeebe.broker.network.maxMessageSize, then
# the memory required by the exporter would be at least:
# (maxBatchSize * zeebe.broker.network.maxMessageSize * 2)
#
# We multiply by 2 as the records are buffered twice - once in the exporter itself, and once
# in the producer's network buffers (but serialized at that point). There's some additional
# memory overhead used by the producer as well for compression/encryption/etc., so you have to
# add a bit, but that one is not proportional to the number of records and is more or less
# constant.
#
# Once the batch has reached this size, a flush is automatically triggered. Too small a number
# here would cause many flush, which is not good for performance, but would mean you will see
# your records faster/sooner.
#
# Default is 100
maxBatchSize: 100
# The maximum time to block when the batch is full. If the batch is full, and a new
# record comes in, the exporter will block until there is space in the batch, or until
# maxBlockingTimeoutMs milliseconds elapse.
maxBlockingTimeoutMs: 1000
# How often should pending batches be flushed to the Kafka broker. Too low a value will
# cause more load on the broker, but means your records will be visible faster.
flushIntervalMs: 1000

# Producer specific configuration
producer:
# The list of initial Kafka broker contact points. The format should be the same
# one as the ProducerConfig expects, i.e. "host:port"
# Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
# For example:
# servers: "kafka:9092,localhost:29092"
servers: ${KAFKA_BOOTSTRAP_SERVERS}
# Controls how long the producer will wait for a request to be acknowledged by
# the Kafka broker before retrying it
# Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
requestTimeoutMs: 5000
# Grace period when shutting down the producer in milliseconds
closeTimeoutMs: 5000
# Producer client identifier
clientId: zeebe

# Any setting under the following section will be passed verbatim to
# ProducerConfig; you can use this to configure authentication, compression,
# etc. Note that you can overwrite some important settings, so avoid changing
# idempotency, delivery timeout, and retries, unless you know what you're doing
config: |
linger.ms=5
buffer.memory=8388608
batch.size=32768
max.block.ms=5000

# Controls which records are pushed to Kafka and to which topic
# Each entry is a sub-map which can contain two entries:
# type => string
# topic => string
#
# Topic is the topic to which the record with the given value type
# should be sent to, e.g. for a deployment record below we would
# send the record to "zeebe-deployment" topic.
#
# Type is a comma separated string of accepted record types, allowing you to filter if you
# want nothing (""), commands ("command"), events ("events"), or rejections ("rejection"),
# or a combination of the three, e.g. "command,event".
#
# To omit certain records entirely, set type to an empty string. For example,
# records:
# deployment: { type: "" }
records:
# If a record value type is omitted in your configuration file,
# it will fall back to whatever is configured in the defaults
defaults: { type: "event", topic: '${KAFKA_TOPIC}'}
# For records with a value of type DEPLOYMENT
# deployment: { topic: zeebe-deployment }
# For records with a value of type ERROR
# error: { topic: zeebe-error }
# For records with a value of type INCIDENT
# incident: { topic: zeebe-incident }
# For records with a value of type JOB
# job: { topic: zeebe-job }
# For records with a value of type MESSAGE
# message: { topic: zeebe-message }
# For records with a value of type MESSAGE_SUBSCRIPTION
# messageSubscription: { topic: zeebe-message-subscription }
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION
# messageStartEventSubscription: { topic: zeebe-message-subscription-start-event }
# For records with a value of type PROCESS
# process: { topic: zeebe-process }
# For records with a value of type PROCESS_INSTANCE
# processInstance: { topic: zeebe-process-instance }
# For records with a value of type TIMER
# timer: { topic: zeebe-timer }
# For records with a value of type VARIABLE
# variable: { topic: zeebe-variable }
# For records with a value of type DEPLOYMENT_DISTRIBUTION
deploymentDistribution: { topic: "" }
# For records with a value of type JOB_BATCH
jobBatch: { type: "" }
# For records with a value of type PROCESS_EVENT
processEvent: { type: "" }
# For records with a value of type PROCESS_INSTANCE_RESULT
processInstanceResult: { type: "" }
# For records with a value of type PROCESS_MESSAGE_SUBSCRIPTION
processMessageSubscription: { type: "" }
Binary file not shown.
Binary file modified docs/how-it-works.png
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, you though on updating this one.
Just being nitty gritty here, would you please adjust the arrows, following a consistent meaning (right now its mixed) - I propose 'technical dependency'.
Also, please adjust the text per each box being on the same semantical level, which makes reading easier.

See my proposed sketch, expressing what I have in mind...
Screenshot 2023-12-20 at 10 50 56

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сould you take a look to see if I understood you correctly (I didn't fully understand the DB dependency, you probably meant the arrow from ZSM to DB)?
how-it-works

Copy link
Collaborator

@nitram509 nitram509 Dec 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, you're right.
My proposal was wrong at this point 🙈

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
Expand All @@ -137,6 +142,12 @@
<artifactId>zeebe-hazelcast-connector</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-protocol-jackson</artifactId>
<version>${zeebe.version}</version>
</dependency>

<dependency>
<groupId>com.querydsl</groupId>
<artifactId>querydsl-apt</artifactId>
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/zeebe/monitor/config/JacksonConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.zeebe.monitor.config;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class JacksonConfig {

@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.registerModule(new ZeebeProtocolModule());
nitram509 marked this conversation as resolved.
Show resolved Hide resolved
return objectMapper;
}
}
Loading