-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add kafka importer as option #638
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
zeebe: | ||
broker: | ||
exporters: | ||
kafka: | ||
className: io.zeebe.exporters.kafka.KafkaExporter | ||
# Update this path to the location of the JAR | ||
# Note that this must be visible to the broker process | ||
jarPath: lib/zeebe-kafka-exporter.jar | ||
args: | ||
# Controls the number of records to buffer in a single record batch before forcing a flush. Note | ||
# that a flush may occur before anyway due to periodic flushing. This setting should help you | ||
# estimate a soft upper bound to the memory consumption of the exporter. If you assume a worst | ||
# case scenario where every record is the size of your zeebe.broker.network.maxMessageSize, then | ||
# the memory required by the exporter would be at least: | ||
# (maxBatchSize * zeebe.broker.network.maxMessageSize * 2) | ||
# | ||
# We multiply by 2 as the records are buffered twice - once in the exporter itself, and once | ||
# in the producer's network buffers (but serialized at that point). There's some additional | ||
# memory overhead used by the producer as well for compression/encryption/etc., so you have to | ||
# add a bit, but that one is not proportional to the number of records and is more or less | ||
# constant. | ||
# | ||
# Once the batch has reached this size, a flush is automatically triggered. Too small a number | ||
# here would cause many flush, which is not good for performance, but would mean you will see | ||
# your records faster/sooner. | ||
# | ||
# Default is 100 | ||
maxBatchSize: 100 | ||
# The maximum time to block when the batch is full. If the batch is full, and a new | ||
# record comes in, the exporter will block until there is space in the batch, or until | ||
# maxBlockingTimeoutMs milliseconds elapse. | ||
maxBlockingTimeoutMs: 1000 | ||
# How often should pending batches be flushed to the Kafka broker. Too low a value will | ||
# cause more load on the broker, but means your records will be visible faster. | ||
flushIntervalMs: 1000 | ||
|
||
# Producer specific configuration | ||
producer: | ||
# The list of initial Kafka broker contact points. The format should be the same | ||
# one as the ProducerConfig expects, i.e. "host:port" | ||
# Maps to ProducerConfig.BOOTSTRAP_SERVERS_CONFIG | ||
# For example: | ||
# servers: "kafka:9092,localhost:29092" | ||
servers: ${KAFKA_BOOTSTRAP_SERVERS} | ||
# Controls how long the producer will wait for a request to be acknowledged by | ||
# the Kafka broker before retrying it | ||
# Maps to ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG | ||
requestTimeoutMs: 5000 | ||
# Grace period when shutting down the producer in milliseconds | ||
closeTimeoutMs: 5000 | ||
# Producer client identifier | ||
clientId: zeebe | ||
|
||
# Any setting under the following section will be passed verbatim to | ||
# ProducerConfig; you can use this to configure authentication, compression, | ||
# etc. Note that you can overwrite some important settings, so avoid changing | ||
# idempotency, delivery timeout, and retries, unless you know what you're doing | ||
config: | | ||
linger.ms=5 | ||
buffer.memory=8388608 | ||
batch.size=32768 | ||
max.block.ms=5000 | ||
|
||
# Controls which records are pushed to Kafka and to which topic | ||
# Each entry is a sub-map which can contain two entries: | ||
# type => string | ||
# topic => string | ||
# | ||
# Topic is the topic to which the record with the given value type | ||
# should be sent to, e.g. for a deployment record below we would | ||
# send the record to "zeebe-deployment" topic. | ||
# | ||
# Type is a comma separated string of accepted record types, allowing you to filter if you | ||
# want nothing (""), commands ("command"), events ("events"), or rejections ("rejection"), | ||
# or a combination of the three, e.g. "command,event". | ||
# | ||
# To omit certain records entirely, set type to an empty string. For example, | ||
# records: | ||
# deployment: { type: "" } | ||
records: | ||
# If a record value type is omitted in your configuration file, | ||
# it will fall back to whatever is configured in the defaults | ||
defaults: { type: "event", topic: '${KAFKA_TOPIC}'} | ||
# For records with a value of type DEPLOYMENT | ||
# deployment: { topic: zeebe-deployment } | ||
# For records with a value of type ERROR | ||
# error: { topic: zeebe-error } | ||
# For records with a value of type INCIDENT | ||
# incident: { topic: zeebe-incident } | ||
# For records with a value of type JOB | ||
# job: { topic: zeebe-job } | ||
# For records with a value of type MESSAGE | ||
# message: { topic: zeebe-message } | ||
# For records with a value of type MESSAGE_SUBSCRIPTION | ||
# messageSubscription: { topic: zeebe-message-subscription } | ||
# For records with a value of type MESSAGE_START_EVENT_SUBSCRIPTION | ||
# messageStartEventSubscription: { topic: zeebe-message-subscription-start-event } | ||
# For records with a value of type PROCESS | ||
# process: { topic: zeebe-process } | ||
# For records with a value of type PROCESS_INSTANCE | ||
# processInstance: { topic: zeebe-process-instance } | ||
# For records with a value of type TIMER | ||
# timer: { topic: zeebe-timer } | ||
# For records with a value of type VARIABLE | ||
# variable: { topic: zeebe-variable } | ||
# For records with a value of type DEPLOYMENT_DISTRIBUTION | ||
deploymentDistribution: { topic: "" } | ||
# For records with a value of type JOB_BATCH | ||
jobBatch: { type: "" } | ||
# For records with a value of type PROCESS_EVENT | ||
processEvent: { type: "" } | ||
# For records with a value of type PROCESS_INSTANCE_RESULT | ||
processInstanceResult: { type: "" } | ||
# For records with a value of type PROCESS_MESSAGE_SUBSCRIPTION | ||
processMessageSubscription: { type: "" } |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great, you though on updating this one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch, you're right. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package io.zeebe.monitor.config; | ||
|
||
import com.fasterxml.jackson.databind.DeserializationFeature; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import io.camunda.zeebe.protocol.jackson.ZeebeProtocolModule; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
@Configuration | ||
public class JacksonConfig { | ||
|
||
@Bean | ||
public ObjectMapper objectMapper() { | ||
var objectMapper = new ObjectMapper(); | ||
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | ||
objectMapper.registerModule(new ZeebeProtocolModule()); | ||
nitram509 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return objectMapper; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I naively tried
docker compose up
it crashed on me, because multiple backends overlap/clash.Also, firing up too many services will stress people, who don't have enough hardware resources.
While I like your idea of these prepared services defined, may I propose to introduce
profiles
.E.g. there could be:
hazelcast
,kafka
,postgres
,mysql
, andinmemory
.Which means, we could also add a small note in the
CONTRIBUTION.md
file,saying something like: "For testing purpose, use docker profiles ...
COMPOSE_PROFILES=inmemory,hazelcast docker compose up
"See also https://docs.docker.com/compose/profiles/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Martin, it's a good idea, but I think there some troubles with multiple profiles. For example:
COMPOSE_PROFILES=in_memory,hazelcast docker compose up
will try to start all services that have "in_memory" OR "hazelcast" profile, because profiles works as "has any".
So each of these services matches:
The workaround is to use compound names for profiles and run docker-compose as follows:
COMPOSE_PROFILES=hazelcast,hazelcast_postgres,postgres
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I see.
Thanks for pointing this out.