PipelineConfig whether it has an env attribute defined. The script is currently unable to visit the classes of fields like topic_name_config, hence any environment variables defined there would remain unknown to it.
+
config_env_vars.env -- Almost all pipeline config environment variables in a dotenv file. The script checks for each field in KpopsConfig whether it has an env attribute defined. The script is currently unable to visit the classes of fields like topic_name_config, hence any environment variables defined there would remain unknown to it.
config_env_vars.env -- Almost all pipeline config environment variables in a table.
variable_substitution.yaml -- A copy of ./tests/pipeline/resources/component-type-substitution/pipeline.yaml used as an example of substitution.
Below are listed existing Ruff plugins/extensions for some of the most popular python IDEs. If you cannot find your Editor of choices or you want something more custom, ruff-lsp enables Ruff to be used in any editor that supports the LSP
# Base component for Kafka-based components.
-# Producer or streaming apps should inherit from this class.
--type:kafka-app# required
-name:kafka-app# required
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-from:# Must not be null
-topics:# read from topic
-${pipeline_name}-input-topic:
-type:input# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra
-${pipeline_name}-input-pattern-topic:
-type:pattern# Implied to be an input pattern if `role` is undefined
-${pipeline_name}-extra-pattern-topic:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-components:# read from specific component
-account-producer:
-type:output# Implied when role is NOT specified
-other-producer:
-role:some-role# Implies `type` to be extra
-component-as-input-pattern:
-type:pattern# Implied to be an input pattern if `role` is undefined
-component-as-extra-pattern:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# `app` can contain application-specific settings, hence the user is free to
-# add the key-value pairs they need.
-app:# required
-streams:# required
-brokers:${brokers}# required
-schemaRegistryUrl:${schema_registry_url}
-nameOverride:override-with-this-name# kafka-app-specific
-imageTag:"1.0.0"# Example values that are shared between streams-app and producer-app
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"2.12.0"# Helm chart version
-# Kafka sink connector
--type:kafka-sink-connector
-name:kafka-sink-connector# required
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-from:# Must not be null
-topics:# read from topic
-${pipeline_name}-input-topic:
-type:input# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra
-${pipeline_name}-input-pattern-topic:
-type:pattern# Implied to be an input pattern if `role` is undefined
-${pipeline_name}-extra-pattern-topic:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-components:# read from specific component
-account-producer:
-type:output# Implied when role is NOT specified
-other-producer:
-role:some-role# Implies `type` to be extra
-component-as-input-pattern:
-type:pattern# Implied to be an input pattern if `role` is undefined
-component-as-extra-pattern:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# `app` contains application-specific settings, hence it does not have a rigid
-# structure. The fields below are just an example. Extensive documentation on
-# connectors: https://kafka.apache.org/documentation/#connectconfigs
-app:# required
-tasks.max:1
-# Helm repository configuration for resetter
-repo_config:
-repository_name:my-repo# required
-url:https://bakdata.github.io/kafka-connect-resetter/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.6"# Helm chart version
-# Overriding Kafka Connect Resetter Helm values. E.g. to override the
-# Image Tag etc.
-resetter_values:
-imageTag:"1.2.3"
-# Kafka source connector
--type:kafka-source-connector# required
-name:kafka-source-connector# required
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-# The source connector has no `from` section
-# from:
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# `app` contains application-specific settings, hence it does not have a rigid
-# structure. The fields below are just an example. Extensive documentation on
-# connectors: https://kafka.apache.org/documentation/#connectconfigs
-app:# required
-tasks.max:1
-# Helm repository configuration for resetter
-repo_config:
-repository_name:my-repo# required
-url:https://bakdata.github.io/kafka-connect-resetter/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.6"# Helm chart version
-# Overriding Kafka Connect Resetter Helm values. E.g. to override the
-# Image Tag etc.
-resetter_values:
-imageTag:"1.2.3"
-# offset.storage.topic
-# https://kafka.apache.org/documentation/#connect_running
-offset_topic:offset_topic
-# Base Kubernetes App
--type:kubernetes-app
-name:kubernetes-app# required
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-from:# Must not be null
-topics:# read from topic
-${pipeline_name}-input-topic:
-type:input# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra
-${pipeline_name}-input-pattern-topic:
-type:pattern# Implied to be an input pattern if `role` is undefined
-${pipeline_name}-extra-pattern-topic:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-components:# read from specific component
-account-producer:
-type:output# Implied when role is NOT specified
-other-producer:
-role:some-role# Implies `type` to be extra
-component-as-input-pattern:
-type:pattern# Implied to be an input pattern if `role` is undefined
-component-as-extra-pattern:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# `app` contains application-specific settings, hence it does not have a rigid
-# structure. The fields below are just an example.
-app:# required
-image:exampleImage# Example
-debug:false# Example
-commandLine:{}# Example
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.0"# Helm chart version
-# Holds configuration to use as values for the streams bootstrap producer-app Helm
-# chart.
-# More documentation on ProducerApp:
-# https://github.com/bakdata/streams-bootstrap
--type:producer-app
-name:producer-app# required
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-# from: # While the producer-app does inherit from kafka-app, it does not need a
-# `from` section, hence it does not support it.
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# Allowed configs:
-# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
-app:# required
-streams:# required, producer-app-specific
-brokers:${brokers}# required
-schemaRegistryUrl:${schema_registry_url}
-outputTopic:output_topic
-extraOutputTopics:
-output_role1:output_topic1
-output_role2:output_topic2
-nameOverride:override-with-this-name# kafka-app-specific
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"2.12.0"# Helm chart version
-# StreamsApp component that configures a streams bootstrap app.
-# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
--type:streams-app# required
-name:streams-app# required
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-from:# Must not be null
-topics:# read from topic
-${pipeline_name}-input-topic:
-type:input# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra
-${pipeline_name}-input-pattern-topic:
-type:pattern# Implied to be an input pattern if `role` is undefined
-${pipeline_name}-extra-pattern-topic:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-components:# read from specific component
-account-producer:
-type:output# Implied when role is NOT specified
-other-producer:
-role:some-role# Implies `type` to be extra
-component-as-input-pattern:
-type:pattern# Implied to be an input pattern if `role` is undefined
-component-as-extra-pattern:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# No arbitrary keys are allowed under `app`here
-# Allowed configs:
-# https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
-app:# required
-# Streams Bootstrap streams section
-streams:# required, streams-app-specific
-brokers:${brokers}# required
-schemaRegistryUrl:${schema_registry_url}
-inputTopics:
--topic1
--topic2
-outputTopic:output-topic
-inputPattern:input-pattern
-extraInputTopics:
-input_role1:
--input_topic1
--input_topic2
-input_role2:
--input_topic3
--input_topic4
-extraInputPatterns:
-pattern_role1:input_pattern1
-extraOutputTopics:
-output_role1:output_topic1
-output_role2:output_topic2
-errorTopic:error-topic
-config:
-my.streams.config:my.value
-nameOverride:override-with-this-name# streams-app-specific
-autoscaling:# streams-app-specific
-consumerGroup:consumer-group# required
-lagThreshold:0# Average target value to trigger scaling actions.
-enabled:false# Whether to enable auto-scaling using KEDA.
-# This is the interval to check each trigger on.
-# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
-pollingInterval:30
-# The period to wait after the last trigger reported active before scaling
-# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
-cooldownPeriod:300
-# The offset reset policy for the consumer if the the consumer group is
-# not yet subscribed to a partition.
-offsetResetPolicy:earliest
-# This setting is passed to the HPA definition that KEDA will create for a
-# given resource and holds the maximum number of replicas of the target resouce.
-# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
-maxReplicas:1
-# Minimum number of replicas KEDA will scale the resource down to.
-# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
-minReplicas:0
-# If this property is set, KEDA will scale the resource down to this
-# number of replicas.
-# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
-idleReplicas:0
-topics:# List of auto-generated Kafka Streams topics used by the streams app.
--topic1
--topic2
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"2.12.0"# Helm chart version
+409
+410
+411
+412
+413
+414
+415
+416
+417
+418
+419
+420
+421
+422
+423
+424
+425
+426
+427
+428
+429
+430
+431
+432
+433
+434
+435
+436
+437
+438
+439
+440
+441
+442
+443
+444
+445
+446
+447
+448
+449
+450
+451
+452
+453
+454
+455
+456
+457
+458
+459
+460
+461
# Kubernetes app managed through Helm with an associated Helm chart
+-type:helm-app
+name:helm-app# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example.
+app:# required
+image:exampleImage# Example
+debug:false# Example
+commandLine:{}# Example
+# Helm repository configuration (optional)
+# If not set the helm repo add will not be called. Useful when using local Helm charts
+repo_config:
+repository_name:bakdata-streams-bootstrap# required
+url:https://bakdata.github.io/streams-bootstrap/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"1.0.0"# Helm chart version
+# Base component for Kafka-based components.
+# Producer or streaming apps should inherit from this class.
+-type:kafka-app# required
+name:kafka-app# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` can contain application-specific settings, hence the user is free to
+# add the key-value pairs they need.
+app:# required
+streams:# required
+brokers:${kafka_brokers}# required
+schemaRegistryUrl:${schema_registry_url}
+nameOverride:override-with-this-name# kafka-app-specific
+imageTag:"1.0.0"# Example values that are shared between streams-app and producer-app
+# Helm repository configuration (optional)
+# If not set the helm repo add will not be called. Useful when using local Helm charts
+repo_config:
+repository_name:bakdata-streams-bootstrap# required
+url:https://bakdata.github.io/streams-bootstrap/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"2.12.0"# Helm chart version
+# Kafka sink connector
+-type:kafka-sink-connector
+name:kafka-sink-connector# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example. Extensive documentation on
+# connectors: https://kafka.apache.org/documentation/#connectconfigs
+app:# required
+tasks.max:1
+# Helm repository configuration for resetter
+repo_config:
+repository_name:my-repo# required
+url:https://bakdata.github.io/kafka-connect-resetter/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"1.0.6"# Helm chart version
+# Overriding Kafka Connect Resetter Helm values. E.g. to override the
+# Image Tag etc.
+resetter_values:
+imageTag:"1.2.3"
+# Kafka source connector
+-type:kafka-source-connector# required
+name:kafka-source-connector# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+# The source connector has no `from` section
+# from:
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example. Extensive documentation on
+# connectors: https://kafka.apache.org/documentation/#connectconfigs
+app:# required
+tasks.max:1
+# Helm repository configuration for resetter
+repo_config:
+repository_name:my-repo# required
+url:https://bakdata.github.io/kafka-connect-resetter/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"1.0.6"# Helm chart version
+# Overriding Kafka Connect Resetter Helm values. E.g. to override the
+# Image Tag etc.
+resetter_values:
+imageTag:"1.2.3"
+# offset.storage.topic
+# https://kafka.apache.org/documentation/#connect_running
+offset_topic:offset_topic
+# Base Kubernetes App
+-type:kubernetes-app
+name:kubernetes-app# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example.
+app:# required
+image:exampleImage# Example
+debug:false# Example
+commandLine:{}# Example
+# Holds configuration to use as values for the streams bootstrap producer-app Helm
+# chart.
+# More documentation on ProducerApp:
+# https://github.com/bakdata/streams-bootstrap
+-type:producer-app
+name:producer-app# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+# from: # While the producer-app does inherit from kafka-app, it does not need a
+# `from` section, hence it does not support it.
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# Allowed configs:
+# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
+app:# required
+streams:# required, producer-app-specific
+brokers:${kafka_brokers}# required
+schemaRegistryUrl:${schema_registry_url}
+outputTopic:output_topic
+extraOutputTopics:
+output_role1:output_topic1
+output_role2:output_topic2
+nameOverride:override-with-this-name# kafka-app-specific
+# Helm repository configuration (optional)
+# If not set the helm repo add will not be called. Useful when using local Helm charts
+repo_config:
+repository_name:bakdata-streams-bootstrap# required
+url:https://bakdata.github.io/streams-bootstrap/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"2.12.0"# Helm chart version
+# StreamsApp component that configures a streams bootstrap app.
+# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
+-type:streams-app# required
+name:streams-app# required
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# No arbitrary keys are allowed under `app`here
+# Allowed configs:
+# https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
+app:# required
+# Streams Bootstrap streams section
+streams:# required, streams-app-specific
+brokers:${kafka_brokers}# required
+schemaRegistryUrl:${schema_registry_url}
+inputTopics:
+-topic1
+-topic2
+outputTopic:output-topic
+inputPattern:input-pattern
+extraInputTopics:
+input_role1:
+-input_topic1
+-input_topic2
+input_role2:
+-input_topic3
+-input_topic4
+extraInputPatterns:
+pattern_role1:input_pattern1
+extraOutputTopics:
+output_role1:output_topic1
+output_role2:output_topic2
+errorTopic:error-topic
+config:
+my.streams.config:my.value
+nameOverride:override-with-this-name# streams-app-specific
+autoscaling:# streams-app-specific
+consumerGroup:consumer-group# required
+lagThreshold:0# Average target value to trigger scaling actions.
+enabled:false# Whether to enable auto-scaling using KEDA.
+# This is the interval to check each trigger on.
+# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
+pollingInterval:30
+# The period to wait after the last trigger reported active before scaling
+# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
+cooldownPeriod:300
+# The offset reset policy for the consumer if the the consumer group is
+# not yet subscribed to a partition.
+offsetResetPolicy:earliest
+# This setting is passed to the HPA definition that KEDA will create for a
+# given resource and holds the maximum number of replicas of the target resouce.
+# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
+maxReplicas:1
+# Minimum number of replicas KEDA will scale the resource down to.
+# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
+minReplicas:0
+# If this property is set, KEDA will scale the resource down to this
+# number of replicas.
+# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
+idleReplicas:0
+topics:# List of auto-generated Kafka Streams topics used by the streams app.
+-topic1
+-topic2
+# Helm repository configuration (optional)
+# If not set the helm repo add will not be called. Useful when using local Helm charts
+repo_config:
+repository_name:bakdata-streams-bootstrap# required
+url:https://bakdata.github.io/streams-bootstrap/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"2.12.0"# Helm chart version
diff --git a/dev/resources/pipeline-components/producer-app.yaml b/dev/resources/pipeline-components/producer-app.yaml
index 7a01ad24b..5be3551d8 100644
--- a/dev/resources/pipeline-components/producer-app.yaml
+++ b/dev/resources/pipeline-components/producer-app.yaml
@@ -32,7 +32,7 @@
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
app: # required
streams: # required, producer-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
outputTopic: output_topic
extraOutputTopics:
diff --git a/dev/resources/pipeline-components/sections/app-helm-app.yaml b/dev/resources/pipeline-components/sections/app-helm-app.yaml
new file mode 100644
index 000000000..e2b6cbae0
--- /dev/null
+++ b/dev/resources/pipeline-components/sections/app-helm-app.yaml
@@ -0,0 +1,6 @@
+ # `app` contains application-specific settings, hence it does not have a rigid
+ # structure. The fields below are just an example.
+ app: # required
+ image: exampleImage # Example
+ debug: false # Example
+ commandLine: {} # Example
diff --git a/dev/resources/pipeline-components/sections/app-kafka-app.yaml b/dev/resources/pipeline-components/sections/app-kafka-app.yaml
index 991e862e0..73b70c59e 100644
--- a/dev/resources/pipeline-components/sections/app-kafka-app.yaml
+++ b/dev/resources/pipeline-components/sections/app-kafka-app.yaml
@@ -2,7 +2,7 @@
# add the key-value pairs they need.
app: # required
streams: # required
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
nameOverride: override-with-this-name # kafka-app-specific
imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app
diff --git a/dev/resources/pipeline-components/sections/app-producer-app.yaml b/dev/resources/pipeline-components/sections/app-producer-app.yaml
index 5cd9b000b..0cbe04ded 100644
--- a/dev/resources/pipeline-components/sections/app-producer-app.yaml
+++ b/dev/resources/pipeline-components/sections/app-producer-app.yaml
@@ -2,7 +2,7 @@
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
app: # required
streams: # required, producer-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
outputTopic: output_topic
extraOutputTopics:
diff --git a/dev/resources/pipeline-components/sections/app-streams-app.yaml b/dev/resources/pipeline-components/sections/app-streams-app.yaml
index 44f6604aa..1c5f0849f 100644
--- a/dev/resources/pipeline-components/sections/app-streams-app.yaml
+++ b/dev/resources/pipeline-components/sections/app-streams-app.yaml
@@ -4,7 +4,7 @@
app: # required
# Streams Bootstrap streams section
streams: # required, streams-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
inputTopics:
- topic1
diff --git a/dev/resources/pipeline-components/sections/repo_config-kubernetes-app.yaml b/dev/resources/pipeline-components/sections/repo_config-helm-app.yaml
similarity index 100%
rename from dev/resources/pipeline-components/sections/repo_config-kubernetes-app.yaml
rename to dev/resources/pipeline-components/sections/repo_config-helm-app.yaml
diff --git a/dev/resources/pipeline-components/streams-app.yaml b/dev/resources/pipeline-components/streams-app.yaml
index 0dde5be5c..f77edf80c 100644
--- a/dev/resources/pipeline-components/streams-app.yaml
+++ b/dev/resources/pipeline-components/streams-app.yaml
@@ -51,7 +51,7 @@
app: # required
# Streams Bootstrap streams section
streams: # required, streams-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
inputTopics:
- topic1
diff --git a/dev/resources/pipeline-defaults/defaults-helm-app.yaml b/dev/resources/pipeline-defaults/defaults-helm-app.yaml
new file mode 100644
index 000000000..d08200203
--- /dev/null
+++ b/dev/resources/pipeline-defaults/defaults-helm-app.yaml
@@ -0,0 +1,21 @@
+# Kubernetes app managed through Helm with an associated Helm chart
+#
+# Parent of: KafkaApp
+# Child of: KubernetesApp
+helm-app:
+ # `app` contains application-specific settings, hence it does not have a rigid
+ # structure. The fields below are just an example.
+ app: # required
+ image: exampleImage # Example
+ debug: false # Example
+ commandLine: {} # Example
+ # Helm repository configuration (optional)
+ # If not set the helm repo add will not be called. Useful when using local Helm charts
+ repo_config:
+ repository_name: bakdata-streams-bootstrap # required
+ url: https://bakdata.github.io/streams-bootstrap/ # required
+ repo_auth_flags:
+ username: user
+ password: pass
+ ca_file: /home/user/path/to/ca-file
+ insecure_skip_tls_verify: false
diff --git a/dev/resources/pipeline-defaults/defaults-kafka-app.yaml b/dev/resources/pipeline-defaults/defaults-kafka-app.yaml
index e0af3b7a7..bd6c9e2d9 100644
--- a/dev/resources/pipeline-defaults/defaults-kafka-app.yaml
+++ b/dev/resources/pipeline-defaults/defaults-kafka-app.yaml
@@ -7,7 +7,7 @@ kafka-app:
# add the key-value pairs they need.
app: # required
streams: # required
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
nameOverride: override-with-this-name # kafka-app-specific
imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app
diff --git a/dev/resources/pipeline-defaults/defaults-kubernetes-app.yaml b/dev/resources/pipeline-defaults/defaults-kubernetes-app.yaml
index d49764b8f..5dd85e9ce 100644
--- a/dev/resources/pipeline-defaults/defaults-kubernetes-app.yaml
+++ b/dev/resources/pipeline-defaults/defaults-kubernetes-app.yaml
@@ -1,6 +1,6 @@
# Base Kubernetes App
#
-# Parent of: KafkaApp
+# Parent of: HelmApp
# Child of: PipelineComponent
kubernetes-app:
# Pipeline prefix that will prefix every component name. If you wish to not
@@ -52,14 +52,3 @@ kubernetes-app:
image: exampleImage # Example
debug: false # Example
commandLine: {} # Example
- # Helm repository configuration (optional)
- # If not set the helm repo add will not be called. Useful when using local Helm charts
- repo_config:
- repository_name: bakdata-streams-bootstrap # required
- url: https://bakdata.github.io/streams-bootstrap/ # required
- repo_auth_flags:
- username: user
- password: pass
- ca_file: /home/user/path/to/ca-file
- insecure_skip_tls_verify: false
- version: "1.0.0" # Helm chart version
diff --git a/dev/resources/pipeline-defaults/defaults-producer-app.yaml b/dev/resources/pipeline-defaults/defaults-producer-app.yaml
index 1d81f5ced..bfa5521c4 100644
--- a/dev/resources/pipeline-defaults/defaults-producer-app.yaml
+++ b/dev/resources/pipeline-defaults/defaults-producer-app.yaml
@@ -10,7 +10,7 @@ producer-app:
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
app: # required
streams: # required, producer-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
outputTopic: output_topic
extraOutputTopics:
diff --git a/dev/resources/pipeline-defaults/defaults-streams-app.yaml b/dev/resources/pipeline-defaults/defaults-streams-app.yaml
index 83ff13f14..ae1adab98 100644
--- a/dev/resources/pipeline-defaults/defaults-streams-app.yaml
+++ b/dev/resources/pipeline-defaults/defaults-streams-app.yaml
@@ -9,7 +9,7 @@ streams-app:
app: # required
# Streams Bootstrap streams section
streams: # required, streams-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
inputTopics:
- topic1
diff --git a/dev/resources/pipeline-defaults/defaults.yaml b/dev/resources/pipeline-defaults/defaults.yaml
index e74272bdc..58b22d3f3 100644
--- a/dev/resources/pipeline-defaults/defaults.yaml
+++ b/dev/resources/pipeline-defaults/defaults.yaml
@@ -1,3 +1,24 @@
+# Kubernetes app managed through Helm with an associated Helm chart
+#
+# Parent of: KafkaApp
+# Child of: KubernetesApp
+helm-app:
+ # `app` contains application-specific settings, hence it does not have a rigid
+ # structure. The fields below are just an example.
+ app: # required
+ image: exampleImage # Example
+ debug: false # Example
+ commandLine: {} # Example
+ # Helm repository configuration (optional)
+ # If not set the helm repo add will not be called. Useful when using local Helm charts
+ repo_config:
+ repository_name: bakdata-streams-bootstrap # required
+ url: https://bakdata.github.io/streams-bootstrap/ # required
+ repo_auth_flags:
+ username: user
+ password: pass
+ ca_file: /home/user/path/to/ca-file
+ insecure_skip_tls_verify: false
# Base component for Kafka-based components.
#
# Parent of: ProducerApp, StreamsApp
@@ -7,7 +28,7 @@ kafka-app:
# add the key-value pairs they need.
app: # required
streams: # required
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
nameOverride: override-with-this-name # kafka-app-specific
imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app
@@ -95,7 +116,7 @@ kafka-source-connector:
offset_topic: offset_topic
# Base Kubernetes App
#
-# Parent of: KafkaApp
+# Parent of: HelmApp
# Child of: PipelineComponent
kubernetes-app:
# Pipeline prefix that will prefix every component name. If you wish to not
@@ -147,17 +168,6 @@ kubernetes-app:
image: exampleImage # Example
debug: false # Example
commandLine: {} # Example
- # Helm repository configuration (optional)
- # If not set the helm repo add will not be called. Useful when using local Helm charts
- repo_config:
- repository_name: bakdata-streams-bootstrap # required
- url: https://bakdata.github.io/streams-bootstrap/ # required
- repo_auth_flags:
- username: user
- password: pass
- ca_file: /home/user/path/to/ca-file
- insecure_skip_tls_verify: false
- version: "1.0.0" # Helm chart version
# Holds configuration to use as values for the streams bootstrap producer-app Helm
# chart.
#
@@ -170,7 +180,7 @@ producer-app:
# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
app: # required
streams: # required, producer-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
outputTopic: output_topic
extraOutputTopics:
@@ -188,7 +198,7 @@ streams-app:
app: # required
# Streams Bootstrap streams section
streams: # required, streams-app-specific
- brokers: ${brokers} # required
+ brokers: ${kafka_brokers} # required
schemaRegistryUrl: ${schema_registry_url}
inputTopics:
- topic1
diff --git a/dev/resources/pipeline-defaults/defaults/index.html b/dev/resources/pipeline-defaults/defaults/index.html
index 1565b9921..4eb3f5e7c 100644
--- a/dev/resources/pipeline-defaults/defaults/index.html
+++ b/dev/resources/pipeline-defaults/defaults/index.html
@@ -175,7 +175,7 @@
# Kubernetes app managed through Helm with an associated Helm chart#
-# Parent of: ProducerApp, StreamsApp
+# Parent of: KafkaApp# Child of: KubernetesApp
-kafka-app:
-# `app` can contain application-specific settings, hence the user is free to
-# add the key-value pairs they need.
+helm-app:
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example.app:# required
-streams:# required
-brokers:${brokers}# required
-schemaRegistryUrl:${schema_registry_url}
-nameOverride:override-with-this-name# kafka-app-specific
-imageTag:"1.0.0"# Example values that are shared between streams-app and producer-app
-version:"2.12.0"# Helm chart version
-# Kafka connector
-#
-# Parent of: KafkaSinkConnector, KafkaSourceConnector
-# Child of: PipelineComponent
-kafka-connector:
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-from:# Must not be null
-topics:# read from topic
-${pipeline_name}-input-topic:
-type:input# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra
-${pipeline_name}-input-pattern-topic:
-type:pattern# Implied to be an input pattern if `role` is undefined
-${pipeline_name}-extra-pattern-topic:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-components:# read from specific component
-account-producer:
-type:output# Implied when role is NOT specified
-other-producer:
-role:some-role# Implies `type` to be extra
-component-as-input-pattern:
-type:pattern# Implied to be an input pattern if `role` is undefined
-component-as-extra-pattern:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# `app` contains application-specific settings, hence it does not have a rigid
-# structure. The fields below are just an example. Extensive documentation on
-# connectors: https://kafka.apache.org/documentation/#connectconfigs
-app:# required
-tasks.max:1
-# Helm repository configuration for resetter
-repo_config:
-repository_name:my-repo# required
-url:https://bakdata.github.io/kafka-connect-resetter/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.6"# Helm chart version
-# Overriding Kafka Connect Resetter Helm values. E.g. to override the
-# Image Tag etc.
-resetter_values:
-imageTag:"1.2.3"
-# Kafka sink connector
-#
-# Child of: KafkaConnector
-kafka-sink-connector:
-# No settings differ from `kafka-connector`
-# Kafka source connector
-#
-# Child of: KafkaConnector
-kafka-source-connector:
-# The source connector has no `from` section
-# from:
-# offset.storage.topic
-# https://kafka.apache.org/documentation/#connect_running
-offset_topic:offset_topic
-# Base Kubernetes App
-#
-# Parent of: KafkaApp
-# Child of: PipelineComponent
-kubernetes-app:
-# Pipeline prefix that will prefix every component name. If you wish to not
-# have any prefix you can specify an empty string.
-prefix:${pipeline_name}-
-from:# Must not be null
-topics:# read from topic
-${pipeline_name}-input-topic:
-type:input# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra
-${pipeline_name}-input-pattern-topic:
-type:pattern# Implied to be an input pattern if `role` is undefined
-${pipeline_name}-extra-pattern-topic:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-components:# read from specific component
-account-producer:
-type:output# Implied when role is NOT specified
-other-producer:
-role:some-role# Implies `type` to be extra
-component-as-input-pattern:
-type:pattern# Implied to be an input pattern if `role` is undefined
-component-as-extra-pattern:
-type:pattern# Implied to be an extra pattern if `role` is defined
-role:some-role
-# Topic(s) into which the component will write output
-to:
-topics:
-${pipeline_name}-output-topic:
-type:output# Implied when role is NOT specified
-${pipeline_name}-extra-topic:
-role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
-${pipeline_name}-error-topic:
-type:error
-# Currently KPOps supports Avro and JSON schemas.
-key_schema:key-schema# must implement SchemaProvider to use
-value_schema:value-schema
-partitions_count:1
-replication_factor:1
-configs:# https://kafka.apache.org/documentation/#topicconfigs
-cleanup.policy:compact
-models:# SchemaProvider is initiated with the values given here
-model:model
-namespace:namespace# required
-# `app` contains application-specific settings, hence it does not have a rigid
-# structure. The fields below are just an example.
-app:# required
-image:exampleImage# Example
-debug:false# Example
-commandLine:{}# Example
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.0"# Helm chart version
-# Holds configuration to use as values for the streams bootstrap producer-app Helm
-# chart.
-#
-# Child of: KafkaApp
-# More documentation on ProducerApp: https://github.com/bakdata/streams-bootstrap
-producer-app:
-# from: # While the producer-app does inherit from kafka-app, it does not need a
-# `from` section, hence it does not support it.
-# Allowed configs:
-# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
-app:# required
-streams:# required, producer-app-specific
-brokers:${brokers}# required
-schemaRegistryUrl:${schema_registry_url}
-outputTopic:output_topic
-extraOutputTopics:
-output_role1:output_topic1
-output_role2:output_topic2
-nameOverride:override-with-this-name# kafka-app-specific
-# StreamsApp component that configures a streams bootstrap app.
-#
-# Child of: KafkaApp
-# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
-streams-app:
-# No arbitrary keys are allowed under `app`here
-# Allowed configs:
-# https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
-app:# required
-# Streams Bootstrap streams section
-streams:# required, streams-app-specific
-brokers:${brokers}# required
-schemaRegistryUrl:${schema_registry_url}
-inputTopics:
--topic1
--topic2
-outputTopic:output-topic
-inputPattern:input-pattern
-extraInputTopics:
-input_role1:
--input_topic1
--input_topic2
-input_role2:
--input_topic3
--input_topic4
-extraInputPatterns:
-pattern_role1:input_pattern1
-extraOutputTopics:
-output_role1:output_topic1
-output_role2:output_topic2
-errorTopic:error-topic
-config:
-my.streams.config:my.value
-nameOverride:override-with-this-name# streams-app-specific
-autoscaling:# streams-app-specific
-consumerGroup:consumer-group# required
-lagThreshold:0# Average target value to trigger scaling actions.
-enabled:false# Whether to enable auto-scaling using KEDA.
-# This is the interval to check each trigger on.
-# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
-pollingInterval:30
-# The period to wait after the last trigger reported active before scaling
-# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
-cooldownPeriod:300
-# The offset reset policy for the consumer if the the consumer group is
-# not yet subscribed to a partition.
-offsetResetPolicy:earliest
-# This setting is passed to the HPA definition that KEDA will create for a
-# given resource and holds the maximum number of replicas of the target resouce.
-# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
-maxReplicas:1
-# Minimum number of replicas KEDA will scale the resource down to.
-# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
-minReplicas:0
-# If this property is set, KEDA will scale the resource down to this
-# number of replicas.
-# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
-idleReplicas:0
-topics:# List of auto-generated Kafka Streams topics used by the streams app.
--topic1
--topic2
+image:exampleImage# Example
+debug:false# Example
+commandLine:{}# Example
+# Helm repository configuration (optional)
+# If not set the helm repo add will not be called. Useful when using local Helm charts
+repo_config:
+repository_name:bakdata-streams-bootstrap# required
+url:https://bakdata.github.io/streams-bootstrap/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+# Base component for Kafka-based components.
+#
+# Parent of: ProducerApp, StreamsApp
+# Child of: KubernetesApp
+kafka-app:
+# `app` can contain application-specific settings, hence the user is free to
+# add the key-value pairs they need.
+app:# required
+streams:# required
+brokers:${kafka_brokers}# required
+schemaRegistryUrl:${schema_registry_url}
+nameOverride:override-with-this-name# kafka-app-specific
+imageTag:"1.0.0"# Example values that are shared between streams-app and producer-app
+version:"2.12.0"# Helm chart version
+# Kafka connector
+#
+# Parent of: KafkaSinkConnector, KafkaSourceConnector
+# Child of: PipelineComponent
+kafka-connector:
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example. Extensive documentation on
+# connectors: https://kafka.apache.org/documentation/#connectconfigs
+app:# required
+tasks.max:1
+# Helm repository configuration for resetter
+repo_config:
+repository_name:my-repo# required
+url:https://bakdata.github.io/kafka-connect-resetter/# required
+repo_auth_flags:
+username:user
+password:pass
+ca_file:/home/user/path/to/ca-file
+insecure_skip_tls_verify:false
+version:"1.0.6"# Helm chart version
+# Overriding Kafka Connect Resetter Helm values. E.g. to override the
+# Image Tag etc.
+resetter_values:
+imageTag:"1.2.3"
+# Kafka sink connector
+#
+# Child of: KafkaConnector
+kafka-sink-connector:
+# No settings differ from `kafka-connector`
+# Kafka source connector
+#
+# Child of: KafkaConnector
+kafka-source-connector:
+# The source connector has no `from` section
+# from:
+# offset.storage.topic
+# https://kafka.apache.org/documentation/#connect_running
+offset_topic:offset_topic
+# Base Kubernetes App
+#
+# Parent of: HelmApp
+# Child of: PipelineComponent
+kubernetes-app:
+# Pipeline prefix that will prefix every component name. If you wish to not
+# have any prefix you can specify an empty string.
+prefix:${pipeline_name}-
+from:# Must not be null
+topics:# read from topic
+${pipeline_name}-input-topic:
+type:input# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra
+${pipeline_name}-input-pattern-topic:
+type:pattern# Implied to be an input pattern if `role` is undefined
+${pipeline_name}-extra-pattern-topic:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+components:# read from specific component
+account-producer:
+type:output# Implied when role is NOT specified
+other-producer:
+role:some-role# Implies `type` to be extra
+component-as-input-pattern:
+type:pattern# Implied to be an input pattern if `role` is undefined
+component-as-extra-pattern:
+type:pattern# Implied to be an extra pattern if `role` is defined
+role:some-role
+# Topic(s) into which the component will write output
+to:
+topics:
+${pipeline_name}-output-topic:
+type:output# Implied when role is NOT specified
+${pipeline_name}-extra-topic:
+role:topic-role# Implies `type` to be extra; Will throw an error if `type` is defined
+${pipeline_name}-error-topic:
+type:error
+# Currently KPOps supports Avro and JSON schemas.
+key_schema:key-schema# must implement SchemaProvider to use
+value_schema:value-schema
+partitions_count:1
+replication_factor:1
+configs:# https://kafka.apache.org/documentation/#topicconfigs
+cleanup.policy:compact
+models:# SchemaProvider is initiated with the values given here
+model:model
+namespace:namespace# required
+# `app` contains application-specific settings, hence it does not have a rigid
+# structure. The fields below are just an example.
+app:# required
+image:exampleImage# Example
+debug:false# Example
+commandLine:{}# Example
+# Holds configuration to use as values for the streams bootstrap producer-app Helm
+# chart.
+#
+# Child of: KafkaApp
+# More documentation on ProducerApp: https://github.com/bakdata/streams-bootstrap
+producer-app:
+# from: # While the producer-app does inherit from kafka-app, it does not need a
+# `from` section, hence it does not support it.
+# Allowed configs:
+# https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app
+app:# required
+streams:# required, producer-app-specific
+brokers:${kafka_brokers}# required
+schemaRegistryUrl:${schema_registry_url}
+outputTopic:output_topic
+extraOutputTopics:
+output_role1:output_topic1
+output_role2:output_topic2
+nameOverride:override-with-this-name# kafka-app-specific
+# StreamsApp component that configures a streams bootstrap app.
+#
+# Child of: KafkaApp
+# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap
+streams-app:
+# No arbitrary keys are allowed under `app`here
+# Allowed configs:
+# https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app
+app:# required
+# Streams Bootstrap streams section
+streams:# required, streams-app-specific
+brokers:${kafka_brokers}# required
+schemaRegistryUrl:${schema_registry_url}
+inputTopics:
+-topic1
+-topic2
+outputTopic:output-topic
+inputPattern:input-pattern
+extraInputTopics:
+input_role1:
+-input_topic1
+-input_topic2
+input_role2:
+-input_topic3
+-input_topic4
+extraInputPatterns:
+pattern_role1:input_pattern1
+extraOutputTopics:
+output_role1:output_topic1
+output_role2:output_topic2
+errorTopic:error-topic
+config:
+my.streams.config:my.value
+nameOverride:override-with-this-name# streams-app-specific
+autoscaling:# streams-app-specific
+consumerGroup:consumer-group# required
+lagThreshold:0# Average target value to trigger scaling actions.
+enabled:false# Whether to enable auto-scaling using KEDA.
+# This is the interval to check each trigger on.
+# https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval
+pollingInterval:30
+# The period to wait after the last trigger reported active before scaling
+# the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod
+cooldownPeriod:300
+# The offset reset policy for the consumer if the the consumer group is
+# not yet subscribed to a partition.
+offsetResetPolicy:earliest
+# This setting is passed to the HPA definition that KEDA will create for a
+# given resource and holds the maximum number of replicas of the target resouce.
+# https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount
+maxReplicas:1
+# Minimum number of replicas KEDA will scale the resource down to.
+# https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount
+minReplicas:0
+# If this property is set, KEDA will scale the resource down to this
+# number of replicas.
+# https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount
+idleReplicas:0
+topics:# List of auto-generated Kafka Streams topics used by the streams app.
+-topic1
+-topic2
Auto generation happens mostly with pre-commit hooks. You can find the pre-commit configuration here. These pre-commit hooks call different Python scripts to auto generate code for the documentation.
cli_env_vars.env -- All CLI environment variables in a dotenv file.
cli_env_vars.md -- All CLI environment variables in a table.
config_env_vars.env -- Almost all pipeline config environment variables in a dotenv file. The script checks for each field in PipelineConfig whether it has an env attribute defined. The script is currently unable to visit the classes of fields like topic_name_config, hence any environment variables defined there would remain unknown to it.
config_env_vars.env -- Almost all pipeline config environment variables in a table.
variable_substitution.yaml -- A copy of ./tests/pipeline/resources/component-type-substitution/pipeline.yaml used as an example of substitution.
Generated by typer-cli from the code in main.py. It is called with Python's subprocess module.
"}, {"location": "developer/auto-generation/#pipeline-and-defaults-example-definitions", "title": "Pipeline and defaults example definitions", "text": "
Generates example pipeline.yaml and defaults.yaml for each individual component, stores them and also concatenates them into 1 big pipeline definition and 1 big pipeline defaults definition.
User input
headers/*\\.yaml -- The top of each example. Includes a description comment, type and name. The headers for pipeline.yaml reside in the pipeline-components dir and the defaults.yaml headers reside in the pipeline-defaults dir. The names of the files must be equal to the respective component type.
sections/*\\.yaml -- Each YAML file contains a single section (component attribute) definition. The intention is to keep the minimal set of definitions there from which any component definition can be built. The names of the files must be equal to the respective component type and the attribute name. The sections are used for both defaults.yaml and pipeline.yaml generation and reside in the pipeline-components dir.
Generated
pipeline-components/dependencies/* Cached information about KPOps components
pipeline_component_dependencies.yaml -- Specifies per component which files in the sections dir should be used for the pipeline.yaml generation.
defaults_pipeline_component_dependencies.yaml -- Specifies per component which files in the sections dir should be used for the defaults.yaml generation.
kpops_structure.yaml -- Specifies the inheritance hierarchy of the components and what sections exist in each component.
pipeline-components/*\\.yaml -- All single-component pipeline definitions and one big (complete) pipeline.yaml that contains all of them.
pipeline-defaults/*\\.yaml -- All single-component defaults definitions and one big (complete) defaults.yaml that contains all of them.
Below are listed existing Ruff plugins/extensions for some of the most popular python IDEs. If you cannot find your Editor of choices or you want something more custom, ruff-lsp enables Ruff to be used in any editor that supports the LSP
To ensure a consistent markdown style, we use dprint to check and reformat.
dprint fmt\n
Use the official documentation to set up dprint. The configuration can be found here.
"}, {"location": "user/what-is-kpops/", "title": "What is KPOps?", "text": "
With a couple of easy commands in the shell and a pipeline.yaml of under 30 lines, KPOps can not only deploy a Kafka pipeline1 to a Kubernetes cluster, but also reset, clean or destroy it!
Deploy Kafka apps to Kubernetes: KPOps allows to deploy consecutive Kafka Streams applications and producers using an easy-to-read and -write pipeline definition.
Manage Kafka Connectors: KPOps connects with your Kafka Connect cluster and deploys, validates, and deletes your connectors.
Configure multiple pipelines and steps: KPOps has various abstractions that simplify configuring multiple pipelines and steps within pipelines by sharing common configuration between different components, such as producers or streaming applications.
Handle your topics and schemas: KPOps not only creates and deletes your topics but also registers and deletes your schemas.
Clean termination of Kafka components: KPOps removes your pipeline components (i.e., Kafka Streams applications) from the Kubernetes cluster and cleans up the component-related states (i.e., removing/resetting offset of Kafka consumer groups).
Preview your pipeline changes: With the KPOps dry-run, you can ensure your pipeline definition is set up correctly. This helps to minimize downtime and prevent potential errors or issues that could impact your production environment.
"}, {"location": "user/what-is-kpops/#example", "title": "Example", "text": "An overview of Word-count pipeline shown in Streams Explorer Word-count pipeline.yaml
KPOps reads its global configuration that is unrelated to a pipeline's components from config.yaml.
Consider enabling KPOps' editor integration feature to enjoy the benefits of autocompletion and validation when configuring your pipeline.
To learn about any of the available settings, take a look at the example below.
config.yaml
# CONFIGURATION\n#\n# The path to the folder containing the defaults.yaml file and the environment\n# defaults files.\ndefaults_path: .\n# The environment you want to generate and deploy the pipeline to. Suffix your\n# environment files with this value (e.g. defaults_development.yaml and\n# pipeline_development.yaml for environment=development).\n# REQUIRED\nenvironment: development\n# The Kafka brokers address.\n# REQUIRED\nbrokers: \"http://broker1:9092,http://broker2:9092\"\n# The name of the defaults file and the prefix of the defaults environment file.\ndefaults_filename_prefix: defaults\n# Configures topic names.\ntopic_name_config: \n # Configures the value for the variable ${output_topic_name}\n default_output_topic_name: ${pipeline_name}-${component_name}\n # Configures the value for the variable ${error_topic_name}\n default_error_topic_name: ${pipeline_name}-${component_name}-error\n# Address of the Schema Registry\nschema_registry_url: \"http://localhost:8081\"\n# Address of the Kafka REST Proxy.\nkafka_rest_host: \"http://localhost:8082\"\n# Address of Kafka Connect.\nkafka_connect_host: \"http://localhost:8083\"\n# The timeout in seconds that specifies when actions like deletion or deploy\n# timeout.\ntimeout: 300\n# Flag for `helm upgrade --install`.\n# Create the release namespace if not present.\ncreate_namespace: false\n# Global flags for Helm.\nhelm_config:\n # Set the name of the kubeconfig context. (--kube-context)\n context: name\n # Run Helm in Debug mode.\n debug: false\n# Configure Helm Diff.\nhelm_diff_config: \n # Set of keys that should not be checked.\n ignore: \n - name\n - imageTag\n# Whether to retain clean up jobs in the cluster or uninstall the, after\n# completion.\nretain_clean_jobs: false\n
Environment-specific pipeline definitions
Similarly to defaults, it is possible to have an unlimited amount of additional environment-specific pipeline definitions. The naming convention is the same: add a suffix of the form _{environment} to the filename.
KPOps has a very efficient way of dealing with repeating settings which manifests as defaults.yaml. This file provides the user with the power to set defaults for any and all components, thus omitting the need to repeat the same settings in pipeline.yaml.
An important mechanic of KPOps is that defaults set for a component apply to all components that inherit from it.
It is possible, although not recommended, to add settings that are specific to a component's subclass. An example would be configuring offset_topic under kafka-connector instead of kafka-source-connector.
It is possible to set specific defaults for each environment by adding files called defaults_{environment}.yaml to the defaults folder at defaults_path. The defaults are loaded based on the currently set environment.
It is important to note that defaults_{environment}.yaml overrides only the settings that are explicitly set to be different from the ones in the base defaults file.
Tip
defaults is the default value of defaults_filename_prefix. Together with defaults_path and environment it can be changed in config.yaml
The defaults codeblocks in this section contain the full set of settings that are specific to the component. If a setting already exists in a parent config, it will not be included in the child's.
# Base Kubernetes App\n#\n# Parent of: KafkaApp\n# Child of: PipelineComponent\nkubernetes-app:\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example.\n app: # required\n image: exampleImage # Example\n debug: false # Example\n commandLine: {} # Example\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.0\" # Helm chart version\n
# StreamsApp component that configures a streams bootstrap app.\n#\n# Child of: KafkaApp\n# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap\nstreams-app:\n # No arbitrary keys are allowed under `app`here\n # Allowed configs:\n # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app\n app: # required\n # Streams Bootstrap streams section\n streams: # required, streams-app-specific\n brokers: ${brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n inputTopics:\n - topic1\n - topic2\n outputTopic: output-topic\n inputPattern: input-pattern\n extraInputTopics:\n input_role1:\n - input_topic1\n - input_topic2\n input_role2:\n - input_topic3\n - input_topic4\n extraInputPatterns:\n pattern_role1: input_pattern1\n extraOutputTopics:\n output_role1: output_topic1\n output_role2: output_topic2\n errorTopic: error-topic\n config:\n my.streams.config: my.value\n nameOverride: override-with-this-name # streams-app-specific\n autoscaling: # streams-app-specific\n consumerGroup: consumer-group # required\n lagThreshold: 0 # Average target value to trigger scaling actions.\n enabled: false # Whether to enable auto-scaling using KEDA.\n # This is the interval to check each trigger on.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval\n pollingInterval: 30\n # The period to wait after the last trigger reported active before scaling\n # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod\n cooldownPeriod: 300\n # The offset reset policy for the consumer if the the consumer group is\n # not yet subscribed to a partition.\n offsetResetPolicy: earliest\n # This setting is passed to the HPA definition that KEDA will create for a\n # given resource and holds the maximum number of replicas of the target resouce.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount\n maxReplicas: 1\n # Minimum number of replicas KEDA will scale the resource down to.\n # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount\n minReplicas: 0\n # If this property is set, KEDA will scale the resource down to this\n # number of replicas.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount\n idleReplicas: 0\n topics: # List of auto-generated Kafka Streams topics used by the streams app.\n - topic1\n - topic2\n
# Kafka connector\n#\n# Parent of: KafkaSinkConnector, KafkaSourceConnector\n# Child of: PipelineComponent\nkafka-connector:\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example. Extensive documentation on\n # connectors: https://kafka.apache.org/documentation/#connectconfigs\n app: # required\n tasks.max: 1\n # Helm repository configuration for resetter\n repo_config:\n repository_name: my-repo # required\n url: https://bakdata.github.io/kafka-connect-resetter/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.6\" # Helm chart version\n # Overriding Kafka Connect Resetter Helm values. E.g. to override the\n # Image Tag etc.\n resetter_values:\n imageTag: \"1.2.3\"\n
# Base component for Kafka-based components.\n# Producer or streaming apps should inherit from this class.\n- type: kafka-app # required\n name: kafka-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` can contain application-specific settings, hence the user is free to\n # add the key-value pairs they need.\n app: # required\n streams: # required\n brokers: ${brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n nameOverride: override-with-this-name # kafka-app-specific\n imageTag: \"1.0.0\" # Example values that are shared between streams-app and producer-app\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"2.12.0\" # Helm chart version\n
KafkaConnector is a component that deploys Kafka Connectors. Since a connector cannot be different from sink or source it is not recommended to use KafkaConnector for deployment in pipeline.yaml. Instead, KafkaConnector should be used in defaults.yaml to set defaults for all connectors in the pipeline as they can share some common settings.
# Kafka sink connector\n- type: kafka-sink-connector\n name: kafka-sink-connector # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example. Extensive documentation on\n # connectors: https://kafka.apache.org/documentation/#connectconfigs\n app: # required\n tasks.max: 1\n # Helm repository configuration for resetter\n repo_config:\n repository_name: my-repo # required\n url: https://bakdata.github.io/kafka-connect-resetter/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.6\" # Helm chart version\n # Overriding Kafka Connect Resetter Helm values. E.g. to override the\n # Image Tag etc.\n resetter_values:\n imageTag: \"1.2.3\"\n
# Kafka source connector\n- type: kafka-source-connector # required\n name: kafka-source-connector # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n # The source connector has no `from` section\n # from:\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example. Extensive documentation on\n # connectors: https://kafka.apache.org/documentation/#connectconfigs\n app: # required\n tasks.max: 1\n # Helm repository configuration for resetter\n repo_config:\n repository_name: my-repo # required\n url: https://bakdata.github.io/kafka-connect-resetter/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.6\" # Helm chart version\n # Overriding Kafka Connect Resetter Helm values. E.g. to override the\n # Image Tag etc.\n resetter_values:\n imageTag: \"1.2.3\"\n # offset.storage.topic\n # https://kafka.apache.org/documentation/#connect_running\n offset_topic: offset_topic\n
# Base Kubernetes App\n- type: kubernetes-app\n name: kubernetes-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example.\n app: # required\n image: exampleImage # Example\n debug: false # Example\n commandLine: {} # Example\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.0\" # Helm chart version\n
# Holds configuration to use as values for the streams bootstrap producer-app Helm\n# chart.\n# More documentation on ProducerApp:\n# https://github.com/bakdata/streams-bootstrap\n- type: producer-app\n name: producer-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n # from: # While the producer-app does inherit from kafka-app, it does not need a\n # `from` section, hence it does not support it.\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # Allowed configs:\n # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app\n app: # required\n streams: # required, producer-app-specific\n brokers: ${brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n outputTopic: output_topic\n extraOutputTopics:\n output_role1: output_topic1\n output_role2: output_topic2\n nameOverride: override-with-this-name # kafka-app-specific\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"2.12.0\" # Helm chart version\n
# StreamsApp component that configures a streams bootstrap app.\n# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap\n- type: streams-app # required\n name: streams-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # No arbitrary keys are allowed under `app`here\n # Allowed configs:\n # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app\n app: # required\n # Streams Bootstrap streams section\n streams: # required, streams-app-specific\n brokers: ${brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n inputTopics:\n - topic1\n - topic2\n outputTopic: output-topic\n inputPattern: input-pattern\n extraInputTopics:\n input_role1:\n - input_topic1\n - input_topic2\n input_role2:\n - input_topic3\n - input_topic4\n extraInputPatterns:\n pattern_role1: input_pattern1\n extraOutputTopics:\n output_role1: output_topic1\n output_role2: output_topic2\n errorTopic: error-topic\n config:\n my.streams.config: my.value\n nameOverride: override-with-this-name # streams-app-specific\n autoscaling: # streams-app-specific\n consumerGroup: consumer-group # required\n lagThreshold: 0 # Average target value to trigger scaling actions.\n enabled: false # Whether to enable auto-scaling using KEDA.\n # This is the interval to check each trigger on.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval\n pollingInterval: 30\n # The period to wait after the last trigger reported active before scaling\n # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod\n cooldownPeriod: 300\n # The offset reset policy for the consumer if the the consumer group is\n # not yet subscribed to a partition.\n offsetResetPolicy: earliest\n # This setting is passed to the HPA definition that KEDA will create for a\n # given resource and holds the maximum number of replicas of the target resouce.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount\n maxReplicas: 1\n # Minimum number of replicas KEDA will scale the resource down to.\n # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount\n minReplicas: 0\n # If this property is set, KEDA will scale the resource down to this\n # number of replicas.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount\n idleReplicas: 0\n topics: # List of auto-generated Kafka Streams topics used by the streams app.\n - topic1\n - topic2\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"2.12.0\" # Helm chart version\n
Environment variables can be set by using the export command in Linux or the set command in Windows.
dotenv files
Support for .env files is on the roadmap, but not implemented in KPOps yet. One of the possible ways to still use one and export the contents manually is with the following command: export $(xargs < .env). This would work in bash suppose there are no spaces inside the values.
These variables are a lower priority alternative to the settings in config.yaml. Variables marked as required can instead be set in the pipeline config.
Name Default Value Required Description Setting name KPOPS_ENVIRONMENT True The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). environment KPOPS_KAFKA_BROKERS True The comma separated Kafka brokers address. brokers KPOPS_SCHEMA_REGISTRY_URL False Address of the Schema Registry. schema_registry_url KPOPS_REST_PROXY_HOST False Address of the Kafka REST Proxy. kafka_rest_host KPOPS_CONNECT_HOST False Address of Kafka Connect. kafka_connect_host KPOPS_TIMEOUT 300 False The timeout in seconds that specifies when actions like deletion or deploy timeout. timeout KPOPS_RETAIN_CLEAN_JOBS False False Whether to retain clean up jobs in the cluster or uninstall the, after completion. retain_clean_jobs config_env_vars.env Exhaustive list of all config-related environment variables
# Pipeline config environment variables\n#\n# The default setup is shown. These variables are a lower priority\n# alternative to the settings in `config.yaml`. Variables marked as\n# required can instead be set in the pipeline config.\n#\n# environment\n# The environment you want to generate and deploy the pipeline to.\n# Suffix your environment files with this value (e.g.\n# defaults_development.yaml for environment=development).\nKPOPS_ENVIRONMENT # No default value, required\n# brokers\n# The comma separated Kafka brokers address.\nKPOPS_KAFKA_BROKERS # No default value, required\n# schema_registry_url\n# Address of the Schema Registry.\nKPOPS_SCHEMA_REGISTRY_URL # No default value, not required\n# kafka_rest_host\n# Address of the Kafka REST Proxy.\nKPOPS_REST_PROXY_HOST # No default value, not required\n# kafka_connect_host\n# Address of Kafka Connect.\nKPOPS_CONNECT_HOST # No default value, not required\n# timeout\n# The timeout in seconds that specifies when actions like deletion or\n# deploy timeout.\nKPOPS_TIMEOUT=300\n# retain_clean_jobs\n# Whether to retain clean up jobs in the cluster or uninstall the,\n# after completion.\nKPOPS_RETAIN_CLEAN_JOBS=False\n
These variables are a lower priority alternative to the commands' flags. If a variable is set, the corresponding flag does not have to be specified in commands. Variables marked as required can instead be set as flags.
Name Default Value Required Description KPOPS_PIPELINE_BASE_DIR . False Base directory to the pipelines (default is current working directory) KPOPS_CONFIG_PATH config.yaml False Path to the config.yaml file KPOPS_DEFAULT_PATH False Path to defaults folder KPOPS_PIPELINE_PATH True Path to YAML with pipeline definition KPOPS_PIPELINE_STEPS False Comma separated list of steps to apply the command on cli_env_vars.env Exhaustive list of all cli-related environment variables
# CLI Environment variables\n#\n# The default setup is shown. These variables are a lower priority\n# alternative to the commands' flags. If a variable is set, the\n# corresponding flag does not have to be specified in commands.\n# Variables marked as required can instead be set as flags.\n#\n# Base directory to the pipelines (default is current working\n# directory)\nKPOPS_PIPELINE_BASE_DIR=.\n# Path to the config.yaml file\nKPOPS_CONFIG_PATH=config.yaml\n# Path to defaults folder\nKPOPS_DEFAULT_PATH # No default value, not required\n# Path to YAML with pipeline definition\nKPOPS_PIPELINE_PATH # No default value, required\n# Comma separated list of steps to apply the command on\nKPOPS_PIPELINE_STEPS # No default value, not required\n
These variables can be used in a component's definition to refer to any of its attributes, including ones that the user has defined in the defaults.
All of them are prefixed with component_ and follow the following form: component_{attribute_name}. If the attribute itself contains attributes, they can be referred to like this: component_{attribute_name}_{subattribute_name}.
These variables include all fields in the config and refer to the pipeline configuration that is independent of the components.
Info
error_topic_name is an alias for topic_name_config_default_error_topic_nameoutput_topic_name is an alias for topic_name_config_default_output_topic_name
Environment variables such as $PATH can be used in the pipeline definition and defaults without any transformation following the form ${ENV_VAR_NAME}. This, of course, includes variables like the ones relevant to the KPOps cli that are exported by the user.
See all KPOps environment variables
"}, {"location": "user/core-concepts/variables/substitution/#pipeline-name-variables", "title": "Pipeline name variables", "text": "
These are special variables that refer to the name and path of a pipeline.
${pipeline_name}: Concatenated path of the parent directory where pipeline.yaml is defined in. For instance, ./data/pipelines/v1/pipeline.yaml, here the value for the variable would be data-pipelines-v1.
${pipeline_name_<level>}: Similar to the previous variable, each <level> contains a part of the path to the pipeline.yaml file. Consider the previous example, ${pipeline_name_0} would be data, ${pipeline_name_1} would be pipelines, and ${pipeline_name_2} equals to v1.
"}, {"location": "user/core-concepts/variables/substitution/#advanced-use-cases", "title": "Advanced use cases", "text": "
Refer to default component field values: As long as a value is assigned to a component attribute, it is possible to refer to it with a placeholder. To see all component fields, take a look at the pipeline schema.
Chaining variables: It is possible to chain any number of variables, see the example above.
Cross-component substitution: YAML is quite an intricate language and with some of its magic one could write cross-component references.
ATM fraud is a demo pipeline for ATM fraud detection. The original by Confluent is written in KSQL and outlined in this blogpost. The one used in this example is re-built from scratch using bakdata's streams-bootstrap library.
"}, {"location": "user/examples/atm-fraud-pipeline/#what-this-will-demonstrate", "title": "What this will demonstrate", "text": "
Before we deploy the pipeline, we need to forward the ports of kafka-rest-proxy and kafka-connect. Run the following commands in two different terminals.
poetry run kpops deploy ./examples/bakdata/atm-fraud-detection/pipeline.yaml \\\n--pipeline-base-dir ./examples \\\n--config ./examples/bakdata/atm-fraud-detection/config.yaml \\\n--execute\n
Note
You can use the --dry-run flag instead of the --execute flag and check the logs if your pipeline will be deployed correctly.
"}, {"location": "user/examples/atm-fraud-pipeline/#check-if-the-deployment-is-successful", "title": "Check if the deployment is successful", "text": "
You can use the Streams Explorer to see the deployed pipeline. To do so, port-forward the service in a separate terminal session using the command below:
After that open http://localhost:8080 in your browser. You should be able to see pipeline shown in the image below:
An overview of ATM fraud pipeline shown in Streams Explorer
Attention
Kafka Connect needs some time to set up the connector. Moreover, Streams Explorer needs a while to scrape the information from Kafka connect. Therefore, it might take a bit until you see the whole graph.
Word-count is a demo pipeline which consists of a producer producing words to Kafka, a Kafka streams app counting the number of times each word occurs and finally a Redis database into which the words are exported.
"}, {"location": "user/getting-started/quick-start/#what-this-will-demonstrate", "title": "What this will demonstrate", "text": "
Before we deploy the pipeline, we need to forward the ports of kafka-rest-proxy and kafka-connect. Run the following commands in two different terminals.
You can use the --dry-run flag instead of the --execute flag and check the logs if your pipeline will be deployed correctly.
"}, {"location": "user/getting-started/quick-start/#check-if-the-deployment-is-successful", "title": "Check if the deployment is successful", "text": "
You can use the Streams Explorer to inspect the deployed pipeline. To do so, port-forward the service in a separate terminal session using the command below:
After that open http://localhost:8080 in your browser.
You should be able to see pipeline shown in the image below:
An overview of Word-count pipeline shown in Streams Explorer
Attention
Kafka Connect needs some time to set up the connector. Moreover, Streams Explorer needs a while to scrape the information from Kafka Connect. Therefore, it might take a bit until you see the whole graph.
k3d (Version 5.4.6+) and Docker (Version >= v20.10.5) or an existing Kubernetes cluster (>= 1.21.0)
kubectl (Compatible with server version 1.21.0)
Helm (Version 3.8.0+)
"}, {"location": "user/getting-started/setup/#setup-kubernetes-with-k3d", "title": "Setup Kubernetes with k3d", "text": "
If you don't have access to an existing Kubernetes cluster, this section will guide you through creating a local cluster. We recommend the lightweight Kubernetes distribution k3s for this. k3d is a wrapper around k3s in Docker that lets you get started fast.
For other ways of installing k3d, you can have a look at their installation guide.
The Kafka deployment needs a modified Docker image. In that case the image is built and pushed to a Docker registry that holds it. If you do not have access to an existing Docker registry, you can use k3d's Docker registry:
Creating a new k3d cluster automatically configures kubectl to connect to the local cluster by modifying your ~/.kube/config. In case you manually set the KUBECONFIG variable or don't want k3d to modify your config, k3d offers many other options.
You can check the cluster status with kubectl get pods -n kube-system. If all returned elements have a STATUS of Running or Completed, then the cluster is up and running.
Kafka is an open-source data streaming platform. More information about Kafka can be found in the documentation. To deploy Kafka, this guide uses Confluent's Helm chart.
To allow connectivity to other systems Kafka Connect needs to be extended with drivers. You can install a JDBC driver for Kafka Connect by creating a new Docker image:
Create a Dockerfile with the following content:
FROM confluentinc/cp-kafka-connect:7.1.3\n\nRUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.6.0\n
Build and push the modified image to your private Docker registry:
Install Kafka, Zookeeper, Confluent's Schema Registry, Kafka Rest Proxy, and Kafka Connect. A single Helm chart installs all five components. Below you can find an example for the --values ./kafka.yaml file configuring the deployment accordingly. Deploy the services:
An example value configuration for Confluent's Helm chart. This configuration deploys a single Kafka Broker, a Schema Registry, Zookeeper, Kafka Rest Proxy, and Kafka Connect with minimal resources.
Streams Explorer allows examining Apache Kafka data pipelines in a Kubernetes cluster including the inspection of schemas and monitoring of metrics. First, add the Helm repository:
The kpops CLI can be used to destroy a pipeline that was previously deployed with KPOps. In case that doesn't work, the pipeline can always be taken down manually with helm (see section Infrastructure).
Navigate to the examples folder. Replace the <name-of-the-example-directory> with the example you want to tear down. For example the atm-fraud-detection.
Remove the pipeline
# Uncomment 1 line to either destroy, reset or clean.\n\n# poetry run kpops destroy <name-of-the-example-directory>/pipeline.yaml \\\n# poetry run kpops reset <name-of-the-example-directory>/pipeline.yaml \\\n# poetry run kpops clean <name-of-the-example-directory>/pipeline.yaml \\\n--config <name-of-the-example-directory>/config.yaml \\\n--execute\n
In case kpops destroy is not working one can uninstall the pipeline services one by one. This is equivalent to running kpops destroy. In case a clean uninstall (like the one kpops clean does) is needed, one needs to also delete the topics and schemas created by deployment of the pipeline.
"}, {"location": "user/migration-guide/v1-v2/", "title": "Migrate from V1 to V2", "text": ""}, {"location": "user/migration-guide/v1-v2/#derive-component-type-automatically-from-class-name", "title": "Derive component type automatically from class name", "text": "
KPOps automatically infers the component type from the class name. Therefore, the type and schema_type attributes can be removed from your custom components. By convention the type would be the lower, and kebab cased name of the class.
class MyCoolStreamApp(StreamsApp):\n- type = \"my-cool-stream-app\"\n+ ...\n
Because of this new convention producer has been renamed to producer-app. This must be addressed in your pipeline.yaml and defaults.yaml.
"}, {"location": "user/migration-guide/v1-v2/#remove-camel-case-conversion-of-internal-models", "title": "Remove camel case conversion of internal models", "text": "
All the internal KPOps models are now snake_case, and only Helm/Kubernetes values require camel casing. You can find an example of a pipeline.yaml in the following. Notice that the app section here remains untouched.
If you are using the KubernetesApp class to define your own Kubernetes resource to deploy, the abstract function get_helm_chart that returns the chart for deploying the app using Helm is now a Python property and renamed to helm_chart.
SCOPE:{pipeline|config}: Scope of the generated schema
pipeline: Schema of PipelineComponents. Includes the built-in kpops components by default. To include custom components, provide [COMPONENTS_MODULES].\n\nconfig: Schema of PipelineConfig. [required]\n
[COMPONENTS_MODULE]: Custom Python module containing your project-specific components
Options:
--include-stock-components / --no-include-stock-components: Include the built-in KPOps components. [default: include-stock-components]
We provided a GitHub composite action called kpops-runner that installs all the necessary dependencies and runs KPOps commands with the given parameters.
"}, {"location": "user/references/ci-integration/github-actions/#input-parameters", "title": "Input Parameters", "text": "Name Required Default Value Type Description command \u2705 - string KPOps command to run. generate, deploy, destroy, reset, clean are possible values. Flags such as --dry-run and --execute need to be specified pipeline \u2705 - string Pipeline to run by KPOps working-directory \u274c . string root directory used by KPOps to run pipelines pipeline-base-dir \u274c - string directory where relative pipeline variables are initialized from defaults \u274c - string defaults folder path config \u274c - string config.yaml file path components \u274c - string components package path filter-type \u274c - string Whether to include/exclude the steps defined in KPOPS_PIPELINE_STEPS python-version \u274c \"3.11.x\" string Python version to install (Defaults to the latest stable version of Python 3.11) kpops-version \u274c latest string KPOps version to install helm-version \u274c latest string Helm version to install token \u274c latest string secrets.GITHUB_TOKEN, needed for setup-helm action if helm-version is set to latest"}, {"location": "user/references/ci-integration/github-actions/#usage", "title": "Usage", "text": "
"}]}
\ No newline at end of file
+{"config": {"lang": ["en"], "separator": "[\\s\\-]+", "pipeline": ["stopWordFilter"]}, "docs": [{"location": "developer/auto-generation/", "title": "Auto generation", "text": "
Auto generation happens mostly with pre-commit hooks. You can find the pre-commit configuration here. These pre-commit hooks call different Python scripts to auto generate code for the documentation.
cli_env_vars.env -- All CLI environment variables in a dotenv file.
cli_env_vars.md -- All CLI environment variables in a table.
config_env_vars.env -- Almost all pipeline config environment variables in a dotenv file. The script checks for each field in KpopsConfig whether it has an env attribute defined. The script is currently unable to visit the classes of fields like topic_name_config, hence any environment variables defined there would remain unknown to it.
config_env_vars.env -- Almost all pipeline config environment variables in a table.
variable_substitution.yaml -- A copy of ./tests/pipeline/resources/component-type-substitution/pipeline.yaml used as an example of substitution.
Generated by typer-cli from the code in main.py. It is called with Python's subprocess module.
"}, {"location": "developer/auto-generation/#pipeline-and-defaults-example-definitions", "title": "Pipeline and defaults example definitions", "text": "
Generates example pipeline.yaml and defaults.yaml for each individual component, stores them and also concatenates them into 1 big pipeline definition and 1 big pipeline defaults definition.
User input
headers/*\\.yaml -- The top of each example. Includes a description comment, type and name. The headers for pipeline.yaml reside in the pipeline-components dir and the defaults.yaml headers reside in the pipeline-defaults dir. The names of the files must be equal to the respective component type.
sections/*\\.yaml -- Each YAML file contains a single section (component attribute) definition. The intention is to keep the minimal set of definitions there from which any component definition can be built. The names of the files must be equal to the respective component type and the attribute name. The sections are used for both defaults.yaml and pipeline.yaml generation and reside in the pipeline-components dir.
Generated
pipeline-components/dependencies/* Cached information about KPOps components
pipeline_component_dependencies.yaml -- Specifies per component which files in the sections dir should be used for the pipeline.yaml generation.
defaults_pipeline_component_dependencies.yaml -- Specifies per component which files in the sections dir should be used for the defaults.yaml generation.
kpops_structure.yaml -- Specifies the inheritance hierarchy of the components and what sections exist in each component.
pipeline-components/*\\.yaml -- All single-component pipeline definitions and one big (complete) pipeline.yaml that contains all of them.
pipeline-defaults/*\\.yaml -- All single-component defaults definitions and one big (complete) defaults.yaml that contains all of them.
To ensure a consistent markdown style, we use dprint to check and reformat.
dprint fmt\n
Use the official documentation to set up dprint. The configuration can be found here.
"}, {"location": "user/what-is-kpops/", "title": "What is KPOps?", "text": "
With a couple of easy commands in the shell and a pipeline.yaml of under 30 lines, KPOps can not only deploy a Kafka pipeline1 to a Kubernetes cluster, but also reset, clean or destroy it!
Deploy Kafka apps to Kubernetes: KPOps allows to deploy consecutive Kafka Streams applications and producers using an easy-to-read and -write pipeline definition.
Manage Kafka Connectors: KPOps connects with your Kafka Connect cluster and deploys, validates, and deletes your connectors.
Configure multiple pipelines and steps: KPOps has various abstractions that simplify configuring multiple pipelines and steps within pipelines by sharing common configuration between different components, such as producers or streaming applications.
Handle your topics and schemas: KPOps not only creates and deletes your topics but also registers and deletes your schemas.
Clean termination of Kafka components: KPOps removes your pipeline components (i.e., Kafka Streams applications) from the Kubernetes cluster and cleans up the component-related states (i.e., removing/resetting offset of Kafka consumer groups).
Preview your pipeline changes: With the KPOps dry-run, you can ensure your pipeline definition is set up correctly. This helps to minimize downtime and prevent potential errors or issues that could impact your production environment.
"}, {"location": "user/what-is-kpops/#example", "title": "Example", "text": "An overview of Word-count pipeline shown in Streams Explorer Word-count pipeline.yaml
KPOps reads its global configuration that is unrelated to a pipeline's components from config.yaml.
Consider enabling KPOps' editor integration feature to enjoy the benefits of autocompletion and validation when configuring your pipeline.
To learn about any of the available settings, take a look at the example below.
config.yaml
# CONFIGURATION\n#\n# The path to the folder containing the defaults.yaml file and the environment\n# defaults files.\ndefaults_path: .\n# The environment you want to generate and deploy the pipeline to. Suffix your\n# environment files with this value (e.g. defaults_development.yaml and\n# pipeline_development.yaml for environment=development).\n# REQUIRED\nenvironment: development\n# The Kafka brokers address.\n# REQUIRED\nbrokers: \"http://broker1:9092,http://broker2:9092\"\n# The name of the defaults file and the prefix of the defaults environment file.\ndefaults_filename_prefix: defaults\n# Configures topic names.\ntopic_name_config: \n # Configures the value for the variable ${output_topic_name}\n default_output_topic_name: ${pipeline_name}-${component_name}\n # Configures the value for the variable ${error_topic_name}\n default_error_topic_name: ${pipeline_name}-${component_name}-error\n# Address of the Schema Registry\nschema_registry_url: \"http://localhost:8081\"\n# Address of the Kafka REST Proxy.\nkafka_rest_host: \"http://localhost:8082\"\n# Address of Kafka Connect.\nkafka_connect_host: \"http://localhost:8083\"\n# The timeout in seconds that specifies when actions like deletion or deploy\n# timeout.\ntimeout: 300\n# Flag for `helm upgrade --install`.\n# Create the release namespace if not present.\ncreate_namespace: false\n# Global flags for Helm.\nhelm_config:\n # Set the name of the kubeconfig context. (--kube-context)\n context: name\n # Run Helm in Debug mode.\n debug: false\n# Configure Helm Diff.\nhelm_diff_config: \n # Set of keys that should not be checked.\n ignore: \n - name\n - imageTag\n# Whether to retain clean up jobs in the cluster or uninstall the, after\n# completion.\nretain_clean_jobs: false\n
Environment-specific pipeline definitions
Similarly to defaults, it is possible to have an unlimited amount of additional environment-specific pipeline definitions. The naming convention is the same: add a suffix of the form _{environment} to the filename.
KPOps has a very efficient way of dealing with repeating settings which manifests as defaults.yaml. This file provides the user with the power to set defaults for any and all components, thus omitting the need to repeat the same settings in pipeline.yaml.
An important mechanic of KPOps is that defaults set for a component apply to all components that inherit from it.
It is possible, although not recommended, to add settings that are specific to a component's subclass. An example would be configuring offset_topic under kafka-connector instead of kafka-source-connector.
It is possible to set specific defaults for each environment by adding files called defaults_{environment}.yaml to the defaults folder at defaults_path. The defaults are loaded based on the currently set environment.
It is important to note that defaults_{environment}.yaml overrides only the settings that are explicitly set to be different from the ones in the base defaults file.
Tip
defaults is the default value of defaults_filename_prefix. Together with defaults_path and environment it can be changed in config.yaml
The defaults codeblocks in this section contain the full set of settings that are specific to the component. If a setting already exists in a parent config, it will not be included in the child's.
# Base Kubernetes App\n#\n# Parent of: HelmApp\n# Child of: PipelineComponent\nkubernetes-app:\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example.\n app: # required\n image: exampleImage # Example\n debug: false # Example\n commandLine: {} # Example\n
# StreamsApp component that configures a streams bootstrap app.\n#\n# Child of: KafkaApp\n# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap\nstreams-app:\n # No arbitrary keys are allowed under `app`here\n # Allowed configs:\n # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app\n app: # required\n # Streams Bootstrap streams section\n streams: # required, streams-app-specific\n brokers: ${kafka_brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n inputTopics:\n - topic1\n - topic2\n outputTopic: output-topic\n inputPattern: input-pattern\n extraInputTopics:\n input_role1:\n - input_topic1\n - input_topic2\n input_role2:\n - input_topic3\n - input_topic4\n extraInputPatterns:\n pattern_role1: input_pattern1\n extraOutputTopics:\n output_role1: output_topic1\n output_role2: output_topic2\n errorTopic: error-topic\n config:\n my.streams.config: my.value\n nameOverride: override-with-this-name # streams-app-specific\n autoscaling: # streams-app-specific\n consumerGroup: consumer-group # required\n lagThreshold: 0 # Average target value to trigger scaling actions.\n enabled: false # Whether to enable auto-scaling using KEDA.\n # This is the interval to check each trigger on.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval\n pollingInterval: 30\n # The period to wait after the last trigger reported active before scaling\n # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod\n cooldownPeriod: 300\n # The offset reset policy for the consumer if the the consumer group is\n # not yet subscribed to a partition.\n offsetResetPolicy: earliest\n # This setting is passed to the HPA definition that KEDA will create for a\n # given resource and holds the maximum number of replicas of the target resouce.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount\n maxReplicas: 1\n # Minimum number of replicas KEDA will scale the resource down to.\n # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount\n minReplicas: 0\n # If this property is set, KEDA will scale the resource down to this\n # number of replicas.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount\n idleReplicas: 0\n topics: # List of auto-generated Kafka Streams topics used by the streams app.\n - topic1\n - topic2\n
# Kafka connector\n#\n# Parent of: KafkaSinkConnector, KafkaSourceConnector\n# Child of: PipelineComponent\nkafka-connector:\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example. Extensive documentation on\n # connectors: https://kafka.apache.org/documentation/#connectconfigs\n app: # required\n tasks.max: 1\n # Helm repository configuration for resetter\n repo_config:\n repository_name: my-repo # required\n url: https://bakdata.github.io/kafka-connect-resetter/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.6\" # Helm chart version\n # Overriding Kafka Connect Resetter Helm values. E.g. to override the\n # Image Tag etc.\n resetter_values:\n imageTag: \"1.2.3\"\n
# Base component for Kafka-based components.\n# Producer or streaming apps should inherit from this class.\n- type: kafka-app # required\n name: kafka-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` can contain application-specific settings, hence the user is free to\n # add the key-value pairs they need.\n app: # required\n streams: # required\n brokers: ${kafka_brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n nameOverride: override-with-this-name # kafka-app-specific\n imageTag: \"1.0.0\" # Example values that are shared between streams-app and producer-app\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"2.12.0\" # Helm chart version\n
KafkaConnector is a component that deploys Kafka Connectors. Since a connector cannot be different from sink or source it is not recommended to use KafkaConnector for deployment in pipeline.yaml. Instead, KafkaConnector should be used in defaults.yaml to set defaults for all connectors in the pipeline as they can share some common settings.
# Kafka sink connector\n- type: kafka-sink-connector\n name: kafka-sink-connector # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example. Extensive documentation on\n # connectors: https://kafka.apache.org/documentation/#connectconfigs\n app: # required\n tasks.max: 1\n # Helm repository configuration for resetter\n repo_config:\n repository_name: my-repo # required\n url: https://bakdata.github.io/kafka-connect-resetter/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.6\" # Helm chart version\n # Overriding Kafka Connect Resetter Helm values. E.g. to override the\n # Image Tag etc.\n resetter_values:\n imageTag: \"1.2.3\"\n
# Kafka source connector\n- type: kafka-source-connector # required\n name: kafka-source-connector # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n # The source connector has no `from` section\n # from:\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example. Extensive documentation on\n # connectors: https://kafka.apache.org/documentation/#connectconfigs\n app: # required\n tasks.max: 1\n # Helm repository configuration for resetter\n repo_config:\n repository_name: my-repo # required\n url: https://bakdata.github.io/kafka-connect-resetter/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"1.0.6\" # Helm chart version\n # Overriding Kafka Connect Resetter Helm values. E.g. to override the\n # Image Tag etc.\n resetter_values:\n imageTag: \"1.2.3\"\n # offset.storage.topic\n # https://kafka.apache.org/documentation/#connect_running\n offset_topic: offset_topic\n
# Base Kubernetes App\n- type: kubernetes-app\n name: kubernetes-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # `app` contains application-specific settings, hence it does not have a rigid\n # structure. The fields below are just an example.\n app: # required\n image: exampleImage # Example\n debug: false # Example\n commandLine: {} # Example\n
# Holds configuration to use as values for the streams bootstrap producer-app Helm\n# chart.\n# More documentation on ProducerApp:\n# https://github.com/bakdata/streams-bootstrap\n- type: producer-app\n name: producer-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n # from: # While the producer-app does inherit from kafka-app, it does not need a\n # `from` section, hence it does not support it.\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # Allowed configs:\n # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app\n app: # required\n streams: # required, producer-app-specific\n brokers: ${kafka_brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n outputTopic: output_topic\n extraOutputTopics:\n output_role1: output_topic1\n output_role2: output_topic2\n nameOverride: override-with-this-name # kafka-app-specific\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"2.12.0\" # Helm chart version\n
# StreamsApp component that configures a streams bootstrap app.\n# More documentation on StreamsApp: https://github.com/bakdata/streams-bootstrap\n- type: streams-app # required\n name: streams-app # required\n # Pipeline prefix that will prefix every component name. If you wish to not\n # have any prefix you can specify an empty string.\n prefix: ${pipeline_name}-\n from: # Must not be null\n topics: # read from topic\n ${pipeline_name}-input-topic:\n type: input # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra\n ${pipeline_name}-input-pattern-topic:\n type: pattern # Implied to be an input pattern if `role` is undefined\n ${pipeline_name}-extra-pattern-topic:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n components: # read from specific component\n account-producer:\n type: output # Implied when role is NOT specified\n other-producer:\n role: some-role # Implies `type` to be extra\n component-as-input-pattern:\n type: pattern # Implied to be an input pattern if `role` is undefined\n component-as-extra-pattern:\n type: pattern # Implied to be an extra pattern if `role` is defined\n role: some-role\n # Topic(s) into which the component will write output\n to:\n topics:\n ${pipeline_name}-output-topic:\n type: output # Implied when role is NOT specified\n ${pipeline_name}-extra-topic:\n role: topic-role # Implies `type` to be extra; Will throw an error if `type` is defined\n ${pipeline_name}-error-topic:\n type: error\n # Currently KPOps supports Avro and JSON schemas.\n key_schema: key-schema # must implement SchemaProvider to use\n value_schema: value-schema\n partitions_count: 1\n replication_factor: 1\n configs: # https://kafka.apache.org/documentation/#topicconfigs\n cleanup.policy: compact\n models: # SchemaProvider is initiated with the values given here\n model: model\n namespace: namespace # required\n # No arbitrary keys are allowed under `app`here\n # Allowed configs:\n # https://github.com/bakdata/streams-bootstrap/tree/master/charts/streams-app\n app: # required\n # Streams Bootstrap streams section\n streams: # required, streams-app-specific\n brokers: ${kafka_brokers} # required\n schemaRegistryUrl: ${schema_registry_url}\n inputTopics:\n - topic1\n - topic2\n outputTopic: output-topic\n inputPattern: input-pattern\n extraInputTopics:\n input_role1:\n - input_topic1\n - input_topic2\n input_role2:\n - input_topic3\n - input_topic4\n extraInputPatterns:\n pattern_role1: input_pattern1\n extraOutputTopics:\n output_role1: output_topic1\n output_role2: output_topic2\n errorTopic: error-topic\n config:\n my.streams.config: my.value\n nameOverride: override-with-this-name # streams-app-specific\n autoscaling: # streams-app-specific\n consumerGroup: consumer-group # required\n lagThreshold: 0 # Average target value to trigger scaling actions.\n enabled: false # Whether to enable auto-scaling using KEDA.\n # This is the interval to check each trigger on.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#pollinginterval\n pollingInterval: 30\n # The period to wait after the last trigger reported active before scaling\n # the resource back to 0. https://keda.sh/docs/2.9/concepts/scaling-deployments/#cooldownperiod\n cooldownPeriod: 300\n # The offset reset policy for the consumer if the the consumer group is\n # not yet subscribed to a partition.\n offsetResetPolicy: earliest\n # This setting is passed to the HPA definition that KEDA will create for a\n # given resource and holds the maximum number of replicas of the target resouce.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#maxreplicacount\n maxReplicas: 1\n # Minimum number of replicas KEDA will scale the resource down to.\n # https://keda.sh/docs/2.7/concepts/scaling-deployments/#minreplicacount\n minReplicas: 0\n # If this property is set, KEDA will scale the resource down to this\n # number of replicas.\n # https://keda.sh/docs/2.9/concepts/scaling-deployments/#idlereplicacount\n idleReplicas: 0\n topics: # List of auto-generated Kafka Streams topics used by the streams app.\n - topic1\n - topic2\n # Helm repository configuration (optional)\n # If not set the helm repo add will not be called. Useful when using local Helm charts\n repo_config:\n repository_name: bakdata-streams-bootstrap # required\n url: https://bakdata.github.io/streams-bootstrap/ # required\n repo_auth_flags:\n username: user\n password: pass\n ca_file: /home/user/path/to/ca-file\n insecure_skip_tls_verify: false\n version: \"2.12.0\" # Helm chart version\n
Environment variables can be set by using the export command in Linux or the set command in Windows.
dotenv files
Support for .env files is on the roadmap, but not implemented in KPOps yet. One of the possible ways to still use one and export the contents manually is with the following command: export $(xargs < .env). This would work in bash suppose there are no spaces inside the values.
These variables are a lower priority alternative to the settings in config.yaml. Variables marked as required can instead be set in the pipeline config.
Name Default Value Required Description Setting name KPOPS_ENVIRONMENT True The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). environment KPOPS_KAFKA_BROKERS True The comma separated Kafka brokers address. kafka_brokers KPOPS_SCHEMA_REGISTRY_URL http://localhost:8081 False Address of the Schema Registry. url KPOPS_KAFKA_REST_URL http://localhost:8082 False Address of the Kafka REST Proxy. url KPOPS_KAFKA_CONNECT_URL http://localhost:8083 False Address of Kafka Connect. url KPOPS_TIMEOUT 300 False The timeout in seconds that specifies when actions like deletion or deploy timeout. timeout KPOPS_RETAIN_CLEAN_JOBS False False Whether to retain clean up jobs in the cluster or uninstall the, after completion. retain_clean_jobs config_env_vars.env Exhaustive list of all config-related environment variables
# Pipeline config environment variables\n#\n# The default setup is shown. These variables are a lower priority\n# alternative to the settings in `config.yaml`. Variables marked as\n# required can instead be set in the pipeline config.\n#\n# environment\n# The environment you want to generate and deploy the pipeline to.\n# Suffix your environment files with this value (e.g.\n# defaults_development.yaml for environment=development).\nKPOPS_ENVIRONMENT # No default value, required\n# kafka_brokers\n# The comma separated Kafka brokers address.\nKPOPS_KAFKA_BROKERS # No default value, required\n# url\n# Address of the Schema Registry.\nKPOPS_SCHEMA_REGISTRY_URL=http://localhost:8081\n# url\n# Address of the Kafka REST Proxy.\nKPOPS_KAFKA_REST_URL=http://localhost:8082\n# url\n# Address of Kafka Connect.\nKPOPS_KAFKA_CONNECT_URL=http://localhost:8083\n# timeout\n# The timeout in seconds that specifies when actions like deletion or\n# deploy timeout.\nKPOPS_TIMEOUT=300\n# retain_clean_jobs\n# Whether to retain clean up jobs in the cluster or uninstall the,\n# after completion.\nKPOPS_RETAIN_CLEAN_JOBS=False\n
These variables are a lower priority alternative to the commands' flags. If a variable is set, the corresponding flag does not have to be specified in commands. Variables marked as required can instead be set as flags.
Name Default Value Required Description KPOPS_PIPELINE_BASE_DIR . False Base directory to the pipelines (default is current working directory) KPOPS_CONFIG_PATH config.yaml False Path to the config.yaml file KPOPS_DEFAULT_PATH False Path to defaults folder KPOPS_PIPELINE_PATH True Path to YAML with pipeline definition KPOPS_PIPELINE_STEPS False Comma separated list of steps to apply the command on cli_env_vars.env Exhaustive list of all cli-related environment variables
# CLI Environment variables\n#\n# The default setup is shown. These variables are a lower priority\n# alternative to the commands' flags. If a variable is set, the\n# corresponding flag does not have to be specified in commands.\n# Variables marked as required can instead be set as flags.\n#\n# Base directory to the pipelines (default is current working\n# directory)\nKPOPS_PIPELINE_BASE_DIR=.\n# Path to the config.yaml file\nKPOPS_CONFIG_PATH=config.yaml\n# Path to defaults folder\nKPOPS_DEFAULT_PATH # No default value, not required\n# Path to YAML with pipeline definition\nKPOPS_PIPELINE_PATH # No default value, required\n# Comma separated list of steps to apply the command on\nKPOPS_PIPELINE_STEPS # No default value, not required\n
These variables can be used in a component's definition to refer to any of its attributes, including ones that the user has defined in the defaults.
All of them are prefixed with component_ and follow the following form: component_{attribute_name}. If the attribute itself contains attributes, they can be referred to like this: component_{attribute_name}_{subattribute_name}.
These variables include all fields in the config and refer to the pipeline configuration that is independent of the components.
Info
error_topic_name is an alias for topic_name_config_default_error_topic_nameoutput_topic_name is an alias for topic_name_config_default_output_topic_name
Environment variables such as $PATH can be used in the pipeline definition and defaults without any transformation following the form ${ENV_VAR_NAME}. This, of course, includes variables like the ones relevant to the KPOps cli that are exported by the user.
See all KPOps environment variables
"}, {"location": "user/core-concepts/variables/substitution/#pipeline-name-variables", "title": "Pipeline name variables", "text": "
These are special variables that refer to the name and path of a pipeline.
${pipeline_name}: Concatenated path of the parent directory where pipeline.yaml is defined in. For instance, ./data/pipelines/v1/pipeline.yaml, here the value for the variable would be data-pipelines-v1.
${pipeline_name_<level>}: Similar to the previous variable, each <level> contains a part of the path to the pipeline.yaml file. Consider the previous example, ${pipeline_name_0} would be data, ${pipeline_name_1} would be pipelines, and ${pipeline_name_2} equals to v1.
"}, {"location": "user/core-concepts/variables/substitution/#advanced-use-cases", "title": "Advanced use cases", "text": "
Refer to default component field values: As long as a value is assigned to a component attribute, it is possible to refer to it with a placeholder. To see all component fields, take a look at the pipeline schema.
Chaining variables: It is possible to chain any number of variables, see the example above.
Cross-component substitution: YAML is quite an intricate language and with some of its magic one could write cross-component references.
ATM fraud is a demo pipeline for ATM fraud detection. The original by Confluent is written in KSQL and outlined in this blogpost. The one used in this example is re-built from scratch using bakdata's streams-bootstrap library.
"}, {"location": "user/examples/atm-fraud-pipeline/#what-this-will-demonstrate", "title": "What this will demonstrate", "text": "
Before we deploy the pipeline, we need to forward the ports of kafka-rest-proxy and kafka-connect. Run the following commands in two different terminals.
poetry run kpops deploy ./examples/bakdata/atm-fraud-detection/pipeline.yaml \\\n--pipeline-base-dir ./examples \\\n--config ./examples/bakdata/atm-fraud-detection/config.yaml \\\n--execute\n
Note
You can use the --dry-run flag instead of the --execute flag and check the logs if your pipeline will be deployed correctly.
"}, {"location": "user/examples/atm-fraud-pipeline/#check-if-the-deployment-is-successful", "title": "Check if the deployment is successful", "text": "
You can use the Streams Explorer to see the deployed pipeline. To do so, port-forward the service in a separate terminal session using the command below:
After that open http://localhost:8080 in your browser. You should be able to see pipeline shown in the image below:
An overview of ATM fraud pipeline shown in Streams Explorer
Attention
Kafka Connect needs some time to set up the connector. Moreover, Streams Explorer needs a while to scrape the information from Kafka connect. Therefore, it might take a bit until you see the whole graph.
Word-count is a demo pipeline which consists of a producer producing words to Kafka, a Kafka streams app counting the number of times each word occurs and finally a Redis database into which the words are exported.
"}, {"location": "user/getting-started/quick-start/#what-this-will-demonstrate", "title": "What this will demonstrate", "text": "
Before we deploy the pipeline, we need to forward the ports of kafka-rest-proxy and kafka-connect. Run the following commands in two different terminals.
You can use the --dry-run flag instead of the --execute flag and check the logs if your pipeline will be deployed correctly.
"}, {"location": "user/getting-started/quick-start/#check-if-the-deployment-is-successful", "title": "Check if the deployment is successful", "text": "
You can use the Streams Explorer to inspect the deployed pipeline. To do so, port-forward the service in a separate terminal session using the command below:
After that open http://localhost:8080 in your browser.
You should be able to see pipeline shown in the image below:
An overview of Word-count pipeline shown in Streams Explorer
Attention
Kafka Connect needs some time to set up the connector. Moreover, Streams Explorer needs a while to scrape the information from Kafka Connect. Therefore, it might take a bit until you see the whole graph.
k3d (Version 5.4.6+) and Docker (Version >= v20.10.5) or an existing Kubernetes cluster (>= 1.21.0)
kubectl (Compatible with server version 1.21.0)
Helm (Version 3.8.0+)
"}, {"location": "user/getting-started/setup/#setup-kubernetes-with-k3d", "title": "Setup Kubernetes with k3d", "text": "
If you don't have access to an existing Kubernetes cluster, this section will guide you through creating a local cluster. We recommend the lightweight Kubernetes distribution k3s for this. k3d is a wrapper around k3s in Docker that lets you get started fast.
For other ways of installing k3d, you can have a look at their installation guide.
The Kafka deployment needs a modified Docker image. In that case the image is built and pushed to a Docker registry that holds it. If you do not have access to an existing Docker registry, you can use k3d's Docker registry:
Creating a new k3d cluster automatically configures kubectl to connect to the local cluster by modifying your ~/.kube/config. In case you manually set the KUBECONFIG variable or don't want k3d to modify your config, k3d offers many other options.
You can check the cluster status with kubectl get pods -n kube-system. If all returned elements have a STATUS of Running or Completed, then the cluster is up and running.
Kafka is an open-source data streaming platform. More information about Kafka can be found in the documentation. To deploy Kafka, this guide uses Confluent's Helm chart.
To allow connectivity to other systems Kafka Connect needs to be extended with drivers. You can install a JDBC driver for Kafka Connect by creating a new Docker image:
Create a Dockerfile with the following content:
FROM confluentinc/cp-kafka-connect:7.1.3\n\nRUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.6.0\n
Build and push the modified image to your private Docker registry:
Install Kafka, Zookeeper, Confluent's Schema Registry, Kafka Rest Proxy, and Kafka Connect. A single Helm chart installs all five components. Below you can find an example for the --values ./kafka.yaml file configuring the deployment accordingly. Deploy the services:
An example value configuration for Confluent's Helm chart. This configuration deploys a single Kafka Broker, a Schema Registry, Zookeeper, Kafka Rest Proxy, and Kafka Connect with minimal resources.
Streams Explorer allows examining Apache Kafka data pipelines in a Kubernetes cluster including the inspection of schemas and monitoring of metrics. First, add the Helm repository:
The kpops CLI can be used to destroy a pipeline that was previously deployed with KPOps. In case that doesn't work, the pipeline can always be taken down manually with helm (see section Infrastructure).
Navigate to the examples folder. Replace the <name-of-the-example-directory> with the example you want to tear down. For example the atm-fraud-detection.
Remove the pipeline
# Uncomment 1 line to either destroy, reset or clean.\n\n# poetry run kpops destroy <name-of-the-example-directory>/pipeline.yaml \\\n# poetry run kpops reset <name-of-the-example-directory>/pipeline.yaml \\\n# poetry run kpops clean <name-of-the-example-directory>/pipeline.yaml \\\n--config <name-of-the-example-directory>/config.yaml \\\n--execute\n
In case kpops destroy is not working one can uninstall the pipeline services one by one. This is equivalent to running kpops destroy. In case a clean uninstall (like the one kpops clean does) is needed, one needs to also delete the topics and schemas created by deployment of the pipeline.
"}, {"location": "user/migration-guide/v1-v2/", "title": "Migrate from V1 to V2", "text": ""}, {"location": "user/migration-guide/v1-v2/#derive-component-type-automatically-from-class-name", "title": "Derive component type automatically from class name", "text": "
KPOps automatically infers the component type from the class name. Therefore, the type and schema_type attributes can be removed from your custom components. By convention the type would be the lower, and kebab cased name of the class.
class MyCoolStreamApp(StreamsApp):\n- type = \"my-cool-stream-app\"\n+ ...\n
Because of this new convention producer has been renamed to producer-app. This must be addressed in your pipeline.yaml and defaults.yaml.
"}, {"location": "user/migration-guide/v1-v2/#remove-camel-case-conversion-of-internal-models", "title": "Remove camel case conversion of internal models", "text": "
All the internal KPOps models are now snake_case, and only Helm/Kubernetes values require camel casing. You can find an example of a pipeline.yaml in the following. Notice that the app section here remains untouched.
If you are using the KubernetesApp class to define your own Kubernetes resource to deploy, the abstract function get_helm_chart that returns the chart for deploying the app using Helm is now a Python property and renamed to helm_chart.
Previously, if you set the environment variable KPOPS_KAFKA_BROKER, you need to replace that now with KPOPS_KAFKA_BROKERS.
"}, {"location": "user/migration-guide/v2-v3/", "title": "Migrate from V2 to V3", "text": ""}, {"location": "user/migration-guide/v2-v3/#make-kafka-rest-proxy-kafka-connect-hosts-default-and-improve-schema-registry-config", "title": "Make Kafka REST Proxy & Kafka Connect hosts default and improve Schema Registry config", "text": "
The breaking changes target the config.yaml file:
The schema_registry_url is replaced with schema_registry.url (default http://localhost:8081) and schema_registry.enabled (default false).
kafka_rest_host is renamed to kafka_rest.url (default http://localhost:8082).
kafka_connect_host is replaced with kafka_connect.url (default http://localhost:8083).
brokers is renamed to kafka_brokers.
The environment variable names of these config fields changed respectively. Please refer to the environment variables documentation page to see the newest changes.
SCOPE:{pipeline|config}: Scope of the generated schema
pipeline: Schema of PipelineComponents. Includes the built-in kpops components by default. To include custom components, provide [COMPONENTS_MODULES].\n\nconfig: Schema of KpopsConfig. [required]\n
[COMPONENTS_MODULE]: Custom Python module containing your project-specific components
Options:
--include-stock-components / --no-include-stock-components: Include the built-in KPOps components. [default: include-stock-components]
We provided a GitHub composite action bakdata/kpops that installs and executes KPOps commands with the given parameters.
"}, {"location": "user/references/ci-integration/github-actions/#input-parameters", "title": "Input Parameters", "text": "Name Required Default Value Type Description command \u2705 - string KPOps command to run. generate, deploy, destroy, reset, clean are possible values. Flags such as --dry-run and --execute need to be specified pipeline \u2705 - string Pipeline to run by KPOps working-directory \u274c . string root directory used by KPOps to run pipelines pipeline-base-dir \u274c - string directory where relative pipeline variables are initialized from defaults \u274c - string defaults folder path config \u274c - string config.yaml file path components \u274c - string components package path filter-type \u274c - string Whether to include/exclude the steps defined in KPOPS_PIPELINE_STEPS python-version \u274c \"3.11.x\" string Python version to install (Defaults to the latest stable version of Python 3.11) kpops-version \u274c latest string KPOps version to install helm-version \u274c latest string Helm version to install token \u274c latest string secrets.GITHUB_TOKEN, needed for setup-helm action if helm-version is set to latest"}, {"location": "user/references/ci-integration/github-actions/#usage", "title": "Usage", "text": "
Configuration# add the key-value pairs they need.app:# requiredstreams:# required
-brokers:${brokers}# required
+brokers:${kafka_brokers}# requiredschemaRegistryUrl:${schema_registry_url}nameOverride:override-with-this-name# kafka-app-specificimageTag:"1.0.0"# Example values that are shared between streams-app and producer-app
@@ -1710,7 +1732,7 @@
# Base Kubernetes App-type:kubernetes-appname:kubernetes-app# required# Pipeline prefix that will prefix every component name. If you wish to not
@@ -1682,26 +1693,15 @@
Configurationimage:exampleImage# Exampledebug:false# ExamplecommandLine:{}# Example
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.0"# Helm chart version
# Base Kubernetes App#
-# Parent of: KafkaApp
+# Parent of: HelmApp# Child of: PipelineComponentkubernetes-app:# Pipeline prefix that will prefix every component name. If you wish to not
@@ -1772,17 +1783,6 @@
image:exampleImage# Exampledebug:false# ExamplecommandLine:{}# Example
-# Helm repository configuration (optional)
-# If not set the helm repo add will not be called. Useful when using local Helm charts
-repo_config:
-repository_name:bakdata-streams-bootstrap# required
-url:https://bakdata.github.io/streams-bootstrap/# required
-repo_auth_flags:
-username:user
-password:pass
-ca_file:/home/user/path/to/ca-file
-insecure_skip_tls_verify:false
-version:"1.0.0"# Helm chart version
@@ -1814,7 +1814,7 @@
KafkaApp# add the key-value pairs they need.app:# requiredstreams:# required
-brokers:${brokers}# required
+brokers:${kafka_brokers}# requiredschemaRegistryUrl:${schema_registry_url}nameOverride:override-with-this-name# kafka-app-specificimageTag:"1.0.0"# Example values that are shared between streams-app and producer-app
@@ -1899,7 +1899,7 @@
ConfigConfig# Suffix your environment files with this value (e.g.# defaults_development.yaml for environment=development).KPOPS_ENVIRONMENT# No default value, required
-# brokers
+# kafka_brokers# The comma separated Kafka brokers address.KPOPS_KAFKA_BROKERS# No default value, required
-# schema_registry_url
+# url# Address of the Schema Registry.
-KPOPS_SCHEMA_REGISTRY_URL# No default value, not required
-# kafka_rest_host
+KPOPS_SCHEMA_REGISTRY_URL=http://localhost:8081
+# url# Address of the Kafka REST Proxy.
-KPOPS_REST_PROXY_HOST# No default value, not required
-# kafka_connect_host
+KPOPS_KAFKA_REST_URL=http://localhost:8082
+# url# Address of Kafka Connect.
-KPOPS_CONNECT_HOST# No default value, not required
+KPOPS_KAFKA_CONNECT_URL=http://localhost:8083# timeout# The timeout in seconds that specifies when actions like deletion or# deploy timeout.
diff --git a/dev/user/core-concepts/variables/substitution/index.html b/dev/user/core-concepts/variables/substitution/index.html
index 59f061613..0a3fec0b8 100644
--- a/dev/user/core-concepts/variables/substitution/index.html
+++ b/dev/user/core-concepts/variables/substitution/index.html
@@ -179,7 +179,7 @@
The schema_registry_url is replaced with schema_registry.url (default http://localhost:8081) and schema_registry.enabled (default false).
+
+
+
kafka_rest_host is renamed to kafka_rest.url (default http://localhost:8082).
+
+
+
kafka_connect_host is replaced with kafka_connect.url (default http://localhost:8083).
+
+
+
brokers is renamed to kafka_brokers.
+
+
+
The environment variable names of these config fields changed respectively. Please refer to the environment variables documentation page to see the newest changes.
We provided a GitHub composite action called
-kpops-runner
-that installs all the necessary dependencies and runs KPOps commands with the given parameters.
+
We provided a GitHub composite action bakdata/kpops that installs and executes KPOps commands with the given parameters.
pipeline: Schema of PipelineComponents. Includes the built-in kpops components by default. To include custom components, provide [COMPONENTS_MODULES].
-config: Schema of PipelineConfig. [required]
+config: Schema of KpopsConfig. [required]
[COMPONENTS_MODULE]: Custom Python module containing your project-specific components