diff --git a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml index 959596df0..a47ad6b50 100644 --- a/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/defaults_pipeline_component_dependencies.yaml @@ -10,10 +10,7 @@ kafka-connector.yaml: - prefix.yaml - from_.yaml - to.yaml -- namespace.yaml - app-kafka-connector.yaml -- repo_config-kafka-connector.yaml -- version-kafka-connector.yaml - resetter_values.yaml kafka-sink-connector.yaml: [] kafka-source-connector.yaml: diff --git a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml index 52192cb22..2ac8b59b2 100644 --- a/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/kpops_structure.yaml @@ -19,30 +19,24 @@ kpops_components_fields: - prefix - from_ - to - - namespace - app - - repo_config - - version + - resetter_namespace - resetter_values kafka-sink-connector: - name - prefix - from_ - to - - namespace - app - - repo_config - - version + - resetter_namespace - resetter_values kafka-source-connector: - name - prefix - from_ - to - - namespace - app - - repo_config - - version + - resetter_namespace - resetter_values - offset_topic kubernetes-app: diff --git a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml index b633db907..c7d08112c 100644 --- a/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml +++ b/docs/docs/resources/pipeline-components/dependencies/pipeline_component_dependencies.yaml @@ -15,28 +15,19 @@ kafka-connector.yaml: - prefix.yaml - from_.yaml - to.yaml -- namespace.yaml - app-kafka-connector.yaml -- repo_config-kafka-connector.yaml -- version-kafka-connector.yaml - resetter_values.yaml kafka-sink-connector.yaml: - prefix.yaml - from_.yaml - to.yaml -- namespace.yaml - app-kafka-connector.yaml -- repo_config-kafka-connector.yaml -- version-kafka-connector.yaml - resetter_values.yaml kafka-source-connector.yaml: - prefix.yaml - from_-kafka-source-connector.yaml - to.yaml -- namespace.yaml - app-kafka-connector.yaml -- repo_config-kafka-connector.yaml -- version-kafka-connector.yaml - resetter_values.yaml - offset_topic-kafka-source-connector.yaml kubernetes-app.yaml: diff --git a/docs/docs/resources/pipeline-components/kafka-connector.yaml b/docs/docs/resources/pipeline-components/kafka-connector.yaml index ca6cfc6eb..b231ae4cc 100644 --- a/docs/docs/resources/pipeline-components/kafka-connector.yaml +++ b/docs/docs/resources/pipeline-components/kafka-connector.yaml @@ -42,22 +42,11 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: diff --git a/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml b/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml index 06d14ffe1..8e100d1b3 100644 --- a/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml +++ b/docs/docs/resources/pipeline-components/kafka-sink-connector.yaml @@ -43,22 +43,11 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: diff --git a/docs/docs/resources/pipeline-components/kafka-source-connector.yaml b/docs/docs/resources/pipeline-components/kafka-source-connector.yaml index e38497b65..fc1f4e8c4 100644 --- a/docs/docs/resources/pipeline-components/kafka-source-connector.yaml +++ b/docs/docs/resources/pipeline-components/kafka-source-connector.yaml @@ -24,22 +24,11 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index 12183b8e6..9a3f93a9e 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -160,22 +160,11 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: @@ -206,22 +195,11 @@ cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: diff --git a/docs/docs/resources/pipeline-config/config.yaml b/docs/docs/resources/pipeline-config/config.yaml index 275382d46..0707280c7 100644 --- a/docs/docs/resources/pipeline-config/config.yaml +++ b/docs/docs/resources/pipeline-config/config.yaml @@ -3,28 +3,35 @@ # The path to the folder containing the defaults.yaml file and the environment # defaults files. defaults_path: . -# The environment you want to generate and deploy the pipeline to. Suffix your -# environment files with this value (e.g. defaults_development.yaml and -# pipeline_development.yaml for environment=development). -# REQUIRED -environment: development +# Custom Python module defining project-specific KPOps components +components_module: null +# Base directory to the pipelines (default is current working directory) +pipeline_base_dir: . # The Kafka brokers address. # REQUIRED -brokers: "http://broker1:9092,http://broker2:9092" +kafka_brokers: "http://broker1:9092,http://broker2:9092" # The name of the defaults file and the prefix of the defaults environment file. defaults_filename_prefix: defaults -# Configures topic names. +# Configure the topic name variables you can use in the pipeline definition. topic_name_config: # Configures the value for the variable ${output_topic_name} - default_output_topic_name: ${pipeline.name}-${component_name} + default_output_topic_name: ${pipeline.name}-${component.name} # Configures the value for the variable ${error_topic_name} - default_error_topic_name: ${pipeline.name}-${component_name}-error -# Address of the Schema Registry -schema_registry_url: "http://localhost:8081" -# Address of the Kafka REST Proxy. -kafka_rest_host: "http://localhost:8082" -# Address of Kafka Connect. -kafka_connect_host: "http://localhost:8083" + default_error_topic_name: ${pipeline.name}-${component.name}-error +# Configuration for Schema Registry. +schema_registry: + # Whether the Schema Registry handler should be initialized. + enabled: false + # Address of the Schema Registry. + url: "http://localhost:8081" +# Configuration for the Kafka REST Proxy. +kafka_rest: + # Address of the Kafka REST Proxy. + url: "http://localhost:8082" +# Configuration for Kafka Connect. +kafka_connect: + # Address of Kafka Connect. + url: "http://localhost:8083" # The timeout in seconds that specifies when actions like deletion or deploy # timeout. timeout: 300 @@ -33,14 +40,16 @@ timeout: 300 create_namespace: false # Global flags for Helm. helm_config: - # Set the name of the kubeconfig context. (--kube-context) + # Name of kubeconfig context (`--kube-context`) context: name # Run Helm in Debug mode. debug: false + # Kubernetes API version used for Capabilities.APIVersions + api_version: null # Configure Helm Diff. helm_diff_config: # Set of keys that should not be checked. - ignore: + ignore: - name - imageTag # Whether to retain clean up jobs in the cluster or uninstall the, after diff --git a/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml b/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml index 489bf8bb1..40a8c117d 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-kafka-connector.yaml @@ -45,22 +45,11 @@ kafka-connector: cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index 05487c7c0..9711a8c6f 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -121,22 +121,11 @@ kafka-connector: cleanup.policy: compact models: # SchemaProvider is initiated with the values given here model: model - namespace: namespace # required # `app` contains application-specific settings, hence it does not have a rigid # structure. The fields below are just an example. Extensive documentation on # connectors: https://kafka.apache.org/documentation/#connectconfigs app: # required tasks.max: 1 - # Helm repository configuration for resetter - repo_config: - repository_name: my-repo # required - url: https://bakdata.github.io/kafka-connect-resetter/ # required - repo_auth_flags: - username: user - password: pass - ca_file: /home/user/path/to/ca-file - insecure_skip_tls_verify: false - version: "1.0.6" # Helm chart version # Overriding Kafka Connect Resetter Helm values. E.g. to override the # Image Tag etc. resetter_values: diff --git a/docs/docs/schema/defaults.json b/docs/docs/schema/defaults.json index 06ec5fdc0..aa5db63da 100644 --- a/docs/docs/schema/defaults.json +++ b/docs/docs/schema/defaults.json @@ -90,7 +90,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, @@ -335,40 +335,32 @@ "title": "Name", "type": "string" }, - "namespace": { - "description": "Namespace in which the component shall be deployed", - "title": "Namespace", - "type": "string" - }, "prefix": { "default": "${pipeline.name}-", "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", "title": "Prefix", "type": "string" }, - "repo_config": { - "allOf": [ + "resetter_namespace": { + "anyOf": [ { - "$ref": "#/$defs/HelmRepoConfig" + "type": "string" + }, + { + "type": "null" } ], - "default": { - "repo_auth_flags": { - "ca_file": null, - "cert_file": null, - "insecure_skip_tls_verify": false, - "password": null, - "username": null - }, - "repository_name": "bakdata-kafka-connect-resetter", - "url": "https://bakdata.github.io/kafka-connect-resetter/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + "default": null, + "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", + "title": "Resetter Namespace" }, "resetter_values": { - "description": "Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", - "title": "Resetter Values", - "type": "object" + "allOf": [ + { + "$ref": "#/$defs/HelmAppValues" + } + ], + "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." }, "to": { "anyOf": [ @@ -381,24 +373,10 @@ ], "default": null, "description": "Topic(s) into which the component will write output" - }, - "version": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": "1.0.4", - "description": "Helm chart version", - "title": "Version" } }, "required": [ "name", - "namespace", "app" ], "title": "KafkaConnector", @@ -452,40 +430,32 @@ "title": "Name", "type": "string" }, - "namespace": { - "description": "Namespace in which the component shall be deployed", - "title": "Namespace", - "type": "string" - }, "prefix": { "default": "${pipeline.name}-", "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", "title": "Prefix", "type": "string" }, - "repo_config": { - "allOf": [ + "resetter_namespace": { + "anyOf": [ { - "$ref": "#/$defs/HelmRepoConfig" + "type": "string" + }, + { + "type": "null" } ], - "default": { - "repo_auth_flags": { - "ca_file": null, - "cert_file": null, - "insecure_skip_tls_verify": false, - "password": null, - "username": null - }, - "repository_name": "bakdata-kafka-connect-resetter", - "url": "https://bakdata.github.io/kafka-connect-resetter/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + "default": null, + "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", + "title": "Resetter Namespace" }, "resetter_values": { - "description": "Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", - "title": "Resetter Values", - "type": "object" + "allOf": [ + { + "$ref": "#/$defs/HelmAppValues" + } + ], + "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." }, "to": { "anyOf": [ @@ -502,24 +472,10 @@ "type": { "const": "kafka-sink-connector", "title": "Type" - }, - "version": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": "1.0.4", - "description": "Helm chart version", - "title": "Version" } }, "required": [ "name", - "namespace", "app", "type" ], @@ -556,11 +512,6 @@ "title": "Name", "type": "string" }, - "namespace": { - "description": "Namespace in which the component shall be deployed", - "title": "Namespace", - "type": "string" - }, "offset_topic": { "anyOf": [ { @@ -580,29 +531,26 @@ "title": "Prefix", "type": "string" }, - "repo_config": { - "allOf": [ + "resetter_namespace": { + "anyOf": [ { - "$ref": "#/$defs/HelmRepoConfig" + "type": "string" + }, + { + "type": "null" } ], - "default": { - "repo_auth_flags": { - "ca_file": null, - "cert_file": null, - "insecure_skip_tls_verify": false, - "password": null, - "username": null - }, - "repository_name": "bakdata-kafka-connect-resetter", - "url": "https://bakdata.github.io/kafka-connect-resetter/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + "default": null, + "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", + "title": "Resetter Namespace" }, "resetter_values": { - "description": "Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", - "title": "Resetter Values", - "type": "object" + "allOf": [ + { + "$ref": "#/$defs/HelmAppValues" + } + ], + "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." }, "to": { "anyOf": [ @@ -619,24 +567,10 @@ "type": { "const": "kafka-source-connector", "title": "Type" - }, - "version": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": "1.0.4", - "description": "Helm chart version", - "title": "Version" } }, "required": [ "name", - "namespace", "app", "type" ], @@ -703,7 +637,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, @@ -821,7 +755,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, @@ -1070,7 +1004,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, @@ -1293,7 +1227,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 557dbc486..186863f62 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -90,7 +90,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, @@ -265,40 +265,32 @@ "title": "Name", "type": "string" }, - "namespace": { - "description": "Namespace in which the component shall be deployed", - "title": "Namespace", - "type": "string" - }, "prefix": { "default": "${pipeline.name}-", "description": "Pipeline prefix that will prefix every component name. If you wish to not have any prefix you can specify an empty string.", "title": "Prefix", "type": "string" }, - "repo_config": { - "allOf": [ + "resetter_namespace": { + "anyOf": [ { - "$ref": "#/$defs/HelmRepoConfig" + "type": "string" + }, + { + "type": "null" } ], - "default": { - "repo_auth_flags": { - "ca_file": null, - "cert_file": null, - "insecure_skip_tls_verify": false, - "password": null, - "username": null - }, - "repository_name": "bakdata-kafka-connect-resetter", - "url": "https://bakdata.github.io/kafka-connect-resetter/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + "default": null, + "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", + "title": "Resetter Namespace" }, "resetter_values": { - "description": "Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", - "title": "Resetter Values", - "type": "object" + "allOf": [ + { + "$ref": "#/$defs/HelmAppValues" + } + ], + "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." }, "to": { "anyOf": [ @@ -315,24 +307,10 @@ "type": { "const": "kafka-sink-connector", "title": "Type" - }, - "version": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": "1.0.4", - "description": "Helm chart version", - "title": "Version" } }, "required": [ "name", - "namespace", "app", "type" ], @@ -369,11 +347,6 @@ "title": "Name", "type": "string" }, - "namespace": { - "description": "Namespace in which the component shall be deployed", - "title": "Namespace", - "type": "string" - }, "offset_topic": { "anyOf": [ { @@ -393,29 +366,26 @@ "title": "Prefix", "type": "string" }, - "repo_config": { - "allOf": [ + "resetter_namespace": { + "anyOf": [ { - "$ref": "#/$defs/HelmRepoConfig" + "type": "string" + }, + { + "type": "null" } ], - "default": { - "repo_auth_flags": { - "ca_file": null, - "cert_file": null, - "insecure_skip_tls_verify": false, - "password": null, - "username": null - }, - "repository_name": "bakdata-kafka-connect-resetter", - "url": "https://bakdata.github.io/kafka-connect-resetter/" - }, - "description": "Configuration of the Helm chart repo to be used for deploying the component" + "default": null, + "description": "Kubernetes namespace in which the Kafka Connect resetter shall be deployed", + "title": "Resetter Namespace" }, "resetter_values": { - "description": "Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc.", - "title": "Resetter Values", - "type": "object" + "allOf": [ + { + "$ref": "#/$defs/HelmAppValues" + } + ], + "description": "Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc." }, "to": { "anyOf": [ @@ -432,24 +402,10 @@ "type": { "const": "kafka-source-connector", "title": "Type" - }, - "version": { - "anyOf": [ - { - "type": "string" - }, - { - "type": "null" - } - ], - "default": "1.0.4", - "description": "Helm chart version", - "title": "Version" } }, "required": [ "name", - "namespace", "app", "type" ], @@ -489,7 +445,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, @@ -738,7 +694,7 @@ "type": "string" }, "namespace": { - "description": "Namespace in which the component shall be deployed", + "description": "Kubernetes namespace in which the component shall be deployed", "title": "Namespace", "type": "string" }, diff --git a/docs/docs/user/core-concepts/variables/substitution.md b/docs/docs/user/core-concepts/variables/substitution.md index b1bfa97e3..eb4076c79 100644 --- a/docs/docs/user/core-concepts/variables/substitution.md +++ b/docs/docs/user/core-concepts/variables/substitution.md @@ -23,11 +23,13 @@ All of them are prefixed with `component.` and follow the following form: `compo These variables include all fields in the [config](../config.md) and refer to the pipeline configuration that is independent of the components. +All such variables are prefixed with `config.` and are of the same form as the [component-specific variables](#component-specific-variables). + !!! info Aliases - `error_topic_name` is an alias for `topic_name_config.default_error_topic_name` - `output_topic_name` is an alias for `topic_name_config.default_output_topic_name` + `error_topic_name` is an alias for `config.topic_name_config.default_error_topic_name` + `output_topic_name` is an alias for `config.topic_name_config.default_output_topic_name` diff --git a/docs/docs/user/migration-guide/v2-v3.md b/docs/docs/user/migration-guide/v2-v3.md index 2c1eef100..c7b5bfc99 100644 --- a/docs/docs/user/migration-guide/v2-v3.md +++ b/docs/docs/user/migration-guide/v2-v3.md @@ -56,6 +56,19 @@ Previously the default `KafkaApp` component configured the [streams-bootstrap](h version: ... ``` +## [Refactor Kafka Connector resetter as individual HelmApp](https://github.com/bakdata/kpops/pull/400) + +Internally, the [Kafka Connector resetter](bakdata-kafka-connect-resetter/kafka-connect-resetter) is now its own standard `HelmApp`, removing a lot of the shared code. +It is configured using the `resetter_namespace` (formerly `namespace`) and `resetter_values` attributes. + +#### defaults.yaml + +```diff + kafka-connector: +- namespace: my-namespace ++ resetter_namespace: my-namespace +``` + ## [Make Kafka REST Proxy & Kafka Connect hosts default and improve Schema Registry config](https://github.com/bakdata/kpops/pull/354) The breaking changes target the `config.yaml` file: diff --git a/examples/bakdata/atm-fraud-detection/defaults.yaml b/examples/bakdata/atm-fraud-detection/defaults.yaml index a5a060378..2183f91d6 100644 --- a/examples/bakdata/atm-fraud-detection/defaults.yaml +++ b/examples/bakdata/atm-fraud-detection/defaults.yaml @@ -4,9 +4,6 @@ pipeline-component: kubernetes-app: namespace: ${NAMESPACE} -kafka-connector: - namespace: ${NAMESPACE} - kafka-app: app: streams: diff --git a/kpops/components/base_components/helm_app.py b/kpops/components/base_components/helm_app.py index 07c3c6831..b8978c5af 100644 --- a/kpops/components/base_components/helm_app.py +++ b/kpops/components/base_components/helm_app.py @@ -101,12 +101,6 @@ def helm_release_name(self) -> str: """The name for the Helm release. Can be overridden.""" return create_helm_release_name(self.full_name) - @property - def clean_release_name(self) -> str: - """The name for the Helm release for cleanup jobs. Can be overridden.""" - suffix = "-clean" - return create_helm_release_name(self.full_name + suffix, suffix) - @property def helm_chart(self) -> str: """Return component's Helm chart.""" diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 246e30b11..38f490458 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -3,19 +3,14 @@ import logging from abc import ABC from functools import cached_property -from typing import NoReturn +from typing import Any, NoReturn -from pydantic import Field, PrivateAttr, ValidationInfo, field_validator +from pydantic import Field, PrivateAttr, ValidationInfo, computed_field, field_validator from typing_extensions import override -from kpops.component_handlers.helm_wrapper.dry_run_handler import DryRunHandler -from kpops.component_handlers.helm_wrapper.helm import Helm -from kpops.component_handlers.helm_wrapper.helm_diff import HelmDiff from kpops.component_handlers.helm_wrapper.model import ( HelmFlags, HelmRepoConfig, - HelmTemplateFlags, - HelmUpgradeInstallFlags, ) from kpops.component_handlers.helm_wrapper.utils import create_helm_release_name from kpops.component_handlers.kafka_connect.model import ( @@ -25,8 +20,8 @@ KafkaConnectorType, ) from kpops.components.base_components.base_defaults_component import deduplicate +from kpops.components.base_components.helm_app import HelmApp, HelmAppValues from kpops.components.base_components.models.from_section import FromTopic -from kpops.components.base_components.models.resource import Resource from kpops.components.base_components.pipeline_component import PipelineComponent from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr @@ -34,41 +29,104 @@ log = logging.getLogger("KafkaConnector") -class KafkaConnector(PipelineComponent, ABC): - """Base class for all Kafka connectors. +class KafkaConnectorResetter(HelmApp): + """Helm app for resetting and cleaning a Kafka Connector. - Should only be used to set defaults - - :param app: Application-specific settings :param repo_config: Configuration of the Helm chart repo to be used for - deploying the component, - defaults to HelmRepoConfig(repository_name="bakdata-kafka-connect-resetter", url="https://bakdata.github.io/kafka-connect-resetter/") - :param namespace: Namespace in which the component shall be deployed + deploying the component, defaults to kafka-connect-resetter Helm repo :param version: Helm chart version, defaults to "1.0.4" - :param resetter_values: Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc., - defaults to dict """ - namespace: str = Field( - default=..., - description=describe_attr("namespace", __doc__), - ) - app: KafkaConnectorConfig = Field( - default=..., - description=describe_attr("app", __doc__), - ) + app: KafkaConnectorResetterValues repo_config: HelmRepoConfig = Field( default=HelmRepoConfig( repository_name="bakdata-kafka-connect-resetter", url="https://bakdata.github.io/kafka-connect-resetter/", - ), - description=describe_attr("repo_config", __doc__), + ) ) version: str | None = Field( default="1.0.4", description=describe_attr("version", __doc__) ) - resetter_values: dict = Field( - default_factory=dict, + suffix: str = "-clean" + + @property + @override + def full_name(self) -> str: + return super().full_name + self.suffix + + @property + @override + def helm_chart(self) -> str: + return f"{self.repo_config.repository_name}/kafka-connect-resetter" + + @property + @override + def helm_release_name(self) -> str: + return create_helm_release_name(self.full_name, self.suffix) + + @property + @override + def helm_flags(self) -> HelmFlags: + return HelmFlags( + create_namespace=self.config.create_namespace, + version=self.version, + wait_for_jobs=True, + wait=True, + ) + + @override + def reset(self, dry_run: bool) -> None: + """Reset connector. + + At first, it deletes the previous cleanup job (connector resetter) + to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. + If retain_clean_jobs config is set to false the cleanup job will be deleted subsequently. + + :param dry_run: If the cleanup should be run in dry run mode or not + """ + log.info( + magentaify( + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.app.config.connector}" + ) + ) + self.destroy(dry_run) + + log.info( + magentaify( + f"Connector Cleanup: deploy Connect {self.app.connector_type} resetter for {self.app.config.connector}" + ) + ) + self.deploy(dry_run) + + if not self.config.retain_clean_jobs: + log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) + self.destroy(dry_run) + + @override + def clean(self, dry_run: bool) -> None: + self.reset(dry_run) + + +class KafkaConnector(PipelineComponent, ABC): + """Base class for all Kafka connectors. + + Should only be used to set defaults + + :param app: Application-specific settings + :param resetter_namespace: Kubernetes namespace in which the Kafka Connect resetter shall be deployed + :param resetter_values: Overriding Kafka Connect resetter Helm values, e.g. to override the image tag etc., + defaults to empty HelmAppValues + """ + + app: KafkaConnectorConfig = Field( + default=..., + description=describe_attr("app", __doc__), + ) + resetter_namespace: str | None = Field( + default=None, description=describe_attr("resetter_namespace", __doc__) + ) + resetter_values: HelmAppValues = Field( + default_factory=HelmAppValues, description=describe_attr("resetter_values", __doc__), ) _connector_type: KafkaConnectorType = PrivateAttr() @@ -90,47 +148,27 @@ def connector_config_should_have_component_name( app["name"] = component_name return KafkaConnectorConfig(**app) + @computed_field @cached_property - def helm(self) -> Helm: - """Helm object that contains component-specific config such as repo.""" - helm_repo_config = self.repo_config - helm = Helm(self.config.helm_config) - helm.add_repo( - helm_repo_config.repository_name, - helm_repo_config.url, - helm_repo_config.repo_auth_flags, - ) - return helm - - @property - def _resetter_release_name(self) -> str: - suffix = "-clean" - return create_helm_release_name(self.full_name + suffix, suffix) - - @property - def _resetter_helm_chart(self) -> str: - return f"{self.repo_config.repository_name}/kafka-connect-resetter" - - @cached_property - def dry_run_handler(self) -> DryRunHandler: - helm_diff = HelmDiff(self.config.helm_diff_config) - return DryRunHandler(self.helm, helm_diff, self.namespace) - - @property - def helm_flags(self) -> HelmFlags: - """Return shared flags for Helm commands.""" - return HelmFlags( - **self.repo_config.repo_auth_flags.model_dump(), - version=self.version, - create_namespace=self.config.create_namespace, - ) - - @property - def template_flags(self) -> HelmTemplateFlags: - """Return flags for Helm template command.""" - return HelmTemplateFlags( - **self.helm_flags.model_dump(), - api_version=self.config.helm_config.api_version, + def _resetter(self) -> KafkaConnectorResetter: + kwargs: dict[str, Any] = {} + if self.resetter_namespace: + kwargs["namespace"] = self.resetter_namespace + return KafkaConnectorResetter( + config=self.config, + handlers=self.handlers, + **kwargs, + **self.model_dump( + exclude={"_resetter", "resetter_values", "resetter_namespace", "app"} + ), + app=KafkaConnectorResetterValues( + connector_type=self._connector_type.value, + config=KafkaConnectorResetterConfig( + connector=self.full_name, + brokers=self.config.kafka_brokers, + ), + **self.resetter_values.model_dump(), + ), ) @override @@ -162,105 +200,6 @@ def clean(self, dry_run: bool) -> None: ) self.handlers.topic_handler.delete_topics(self.to, dry_run=dry_run) - def _run_connect_resetter( - self, - dry_run: bool, - retain_clean_jobs: bool, - **kwargs, - ) -> None: - """Clean the connector from the cluster. - - At first, it deletes the previous cleanup job (connector resetter) - to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. - If the retain_clean_jobs flag is set to false the cleanup job will be deleted. - - :param dry_run: If the cleanup should be run in dry run mode or not - :param retain_clean_jobs: If the cleanup job should be kept - :param kwargs: Other values for the KafkaConnectorResetter - """ - log.info( - magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}" - ) - ) - self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) - - log.info( - magentaify( - f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}" - ) - ) - - stdout = self.__install_connect_resetter(dry_run, **kwargs) - - if dry_run: - self.dry_run_handler.print_helm_diff( - stdout, self._resetter_release_name, log - ) - - if not retain_clean_jobs: - log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) - self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) - - def __install_connect_resetter( - self, - dry_run: bool, - **kwargs, - ) -> str: - """Install connector resetter. - - :param dry_run: Whether to dry run the command - :return: The output of `helm upgrade --install` - """ - return self.helm.upgrade_install( - release_name=self._resetter_release_name, - namespace=self.namespace, - chart=self._resetter_helm_chart, - dry_run=dry_run, - flags=HelmUpgradeInstallFlags( - create_namespace=self.config.create_namespace, - version=self.version, - wait_for_jobs=True, - wait=True, - ), - values=self._get_kafka_connect_resetter_values( - **kwargs, - ), - ) - - def _get_kafka_connect_resetter_values( - self, - **kwargs, - ) -> dict: - """Get connector resetter helm chart values. - - :return: The Helm chart values of the connector resetter - """ - return { - **KafkaConnectorResetterValues( - config=KafkaConnectorResetterConfig( - connector=self.full_name, - brokers=self.config.kafka_brokers, - **kwargs, - ), - connector_type=self._connector_type.value, - name_override=self.full_name + "-clean", - ).model_dump(), - **self.resetter_values, - } - - def __uninstall_connect_resetter(self, release_name: str, dry_run: bool) -> None: - """Uninstall connector resetter. - - :param release_name: Name of the release to be uninstalled - :param dry_run: Whether to do a dry run of the command - """ - self.helm.uninstall( - namespace=self.namespace, - release_name=release_name, - dry_run=dry_run, - ) - class KafkaSourceConnector(KafkaConnector): """Kafka source connector model. @@ -282,38 +221,16 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: msg = "Kafka source connector doesn't support FromSection" raise NotImplementedError(msg) - @override - def manifest(self) -> Resource: - values = self._get_kafka_connect_resetter_values( - offset_topic=self.offset_topic, - ) - return self.helm.template( - self._resetter_release_name, - self._resetter_helm_chart, - self.namespace, - values, - self.template_flags, - ) - @override def reset(self, dry_run: bool) -> None: - self.__run_kafka_connect_resetter(dry_run) + self._resetter.app.config.offset_topic = self.offset_topic + self._resetter.reset(dry_run) @override def clean(self, dry_run: bool) -> None: super().clean(dry_run) - self.__run_kafka_connect_resetter(dry_run) - - def __run_kafka_connect_resetter(self, dry_run: bool) -> None: - """Run the connector resetter. - - :param dry_run: Whether to do a dry run of the command - """ - self._run_connect_resetter( - dry_run=dry_run, - retain_clean_jobs=self.config.retain_clean_jobs, - offset_topic=self.offset_topic, - ) + self._resetter.app.config.offset_topic = self.offset_topic + self._resetter.clean(dry_run) class KafkaSinkConnector(KafkaConnector): @@ -328,17 +245,6 @@ def add_input_topics(self, topics: list[str]) -> None: topics = deduplicate(topics) setattr(self.app, "topics", ",".join(topics)) - @override - def manifest(self) -> Resource: - values = self._get_kafka_connect_resetter_values() - return self.helm.template( - self._resetter_release_name, - self._resetter_helm_chart, - self.namespace, - values, - self.template_flags, - ) - @override def set_input_pattern(self, name: str) -> None: setattr(self.app, "topics.regex", name) @@ -349,23 +255,11 @@ def set_error_topic(self, topic_name: str) -> None: @override def reset(self, dry_run: bool) -> None: - self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=False) + self._resetter.app.config.delete_consumer_group = False + self._resetter.reset(dry_run) @override def clean(self, dry_run: bool) -> None: super().clean(dry_run) - self.__run_kafka_connect_resetter(dry_run, delete_consumer_group=True) - - def __run_kafka_connect_resetter( - self, dry_run: bool, delete_consumer_group: bool - ) -> None: - """Run the connector resetter. - - :param dry_run: Whether to do a dry run of the command - :param delete_consumer_group: Whether the consumer group should be deleted or not - """ - self._run_connect_resetter( - dry_run=dry_run, - retain_clean_jobs=self.config.retain_clean_jobs, - delete_consumer_group=delete_consumer_group, - ) + self._resetter.app.config.delete_consumer_group = True + self._resetter.clean(dry_run) diff --git a/kpops/components/base_components/kubernetes_app.py b/kpops/components/base_components/kubernetes_app.py index 2b4065191..5f3c3e67d 100644 --- a/kpops/components/base_components/kubernetes_app.py +++ b/kpops/components/base_components/kubernetes_app.py @@ -31,7 +31,7 @@ class KubernetesApp(PipelineComponent, ABC): All built-in components are Kubernetes apps, except for the Kafka connectors. - :param namespace: Namespace in which the component shall be deployed + :param namespace: Kubernetes namespace in which the component shall be deployed :param app: Application-specific settings """ diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index d352a6d8a..3e01259a2 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -8,7 +8,9 @@ from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import HelmDiffConfig from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig -from kpops.components.base_components.kafka_connector import KafkaConnector +from kpops.components.base_components.kafka_connector import ( + KafkaConnector, +) from kpops.config import KpopsConfig, TopicNameConfig DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -17,6 +19,7 @@ CONNECTOR_CLEAN_FULL_NAME = CONNECTOR_FULL_NAME + "-clean" CONNECTOR_CLEAN_RELEASE_NAME = "${pipeline.name}-test-connector-with-lon-612f3-clean" CONNECTOR_CLASS = "com.bakdata.connect.TestConnector" +RESETTER_NAMESPACE = "test-namespace" class TestKafkaConnector: @@ -43,13 +46,13 @@ def handlers(self) -> ComponentHandlers: @pytest.fixture(autouse=True) def helm_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kafka_connector.Helm" + "kpops.components.base_components.helm_app.Helm" ).return_value @pytest.fixture() - def dry_run_handler(self, mocker: MockerFixture) -> MagicMock: + def dry_run_handler_mock(self, mocker: MockerFixture) -> MagicMock: return mocker.patch( - "kpops.components.base_components.kafka_connector.DryRunHandler" + "kpops.components.base_components.helm_app.DryRunHandler" ).return_value @pytest.fixture() @@ -61,27 +64,35 @@ def connector_config(self) -> KafkaConnectorConfig: } ) - def test_connector_config_name_override( + @pytest.fixture() + def connector( self, config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, - ): - connector = KafkaConnector( + ) -> KafkaConnector: + return KafkaConnector( name=CONNECTOR_NAME, config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, ) + + def test_connector_config_name_override( + self, + connector: KafkaConnector, + config: KpopsConfig, + handlers: ComponentHandlers, + ): assert connector.app.name == CONNECTOR_FULL_NAME connector = KafkaConnector( name=CONNECTOR_NAME, config=config, handlers=handlers, - app={"connector.class": CONNECTOR_CLASS}, # type: ignore[reportGeneralTypeIssues] - namespace="test-namespace", + app={"connector.class": CONNECTOR_CLASS}, # type: ignore[reportGeneralTypeIssues], gets enriched + resetter_namespace=RESETTER_NAMESPACE, ) assert connector.app.name == CONNECTOR_FULL_NAME @@ -95,8 +106,7 @@ def test_connector_config_name_override( name=CONNECTOR_NAME, config=config, handlers=handlers, - app={"connector.class": CONNECTOR_CLASS, "name": "different-name"}, # type: ignore[reportGeneralTypeIssues] - namespace="test-namespace", + app={"connector.class": CONNECTOR_CLASS, "name": "different-name"}, # type: ignore[reportGeneralTypeIssues], gets enriched ) with pytest.raises( @@ -109,22 +119,5 @@ def test_connector_config_name_override( name=CONNECTOR_NAME, config=config, handlers=handlers, - app={"connector.class": CONNECTOR_CLASS, "name": ""}, # type: ignore[reportGeneralTypeIssues] - namespace="test-namespace", + app={"connector.class": CONNECTOR_CLASS, "name": ""}, # type: ignore[reportGeneralTypeIssues], gets enriched ) - - def test_resetter_release_name( - self, - config: KpopsConfig, - handlers: ComponentHandlers, - connector_config: KafkaConnectorConfig, - ): - connector = KafkaConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=connector_config, - namespace="test-namespace", - ) - assert connector.app.name == CONNECTOR_FULL_NAME - assert connector._resetter_release_name == CONNECTOR_CLEAN_RELEASE_NAME diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 81b5049d9..ef4f7caa3 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock, call +from unittest.mock import ANY, MagicMock, call import pytest from pytest_mock import MockerFixture @@ -13,6 +13,7 @@ KafkaConnectorType, ) from kpops.components import KafkaSinkConnector +from kpops.components.base_components.kafka_connector import KafkaConnectorResetter from kpops.components.base_components.models.from_section import ( FromSection, FromTopic, @@ -31,9 +32,12 @@ CONNECTOR_CLEAN_RELEASE_NAME, CONNECTOR_FULL_NAME, CONNECTOR_NAME, + RESETTER_NAMESPACE, TestKafkaConnector, ) +CONNECTOR_TYPE = KafkaConnectorType.SINK.value + class TestKafkaSinkConnector(TestKafkaConnector): @pytest.fixture() @@ -52,7 +56,7 @@ def connector( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( @@ -62,6 +66,12 @@ def connector( ), ) + def test_resetter_release_name(self, connector: KafkaSinkConnector): + assert connector.app.name == CONNECTOR_FULL_NAME + resetter = connector._resetter + assert isinstance(resetter, KafkaConnectorResetter) + assert connector._resetter.helm_release_name == CONNECTOR_CLEAN_RELEASE_NAME + def test_connector_config_parsing( self, config: KpopsConfig, @@ -76,7 +86,7 @@ def test_connector_config_parsing( app=KafkaConnectorConfig( **{**connector_config.model_dump(), "topics": topic_name} ), - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, ) assert getattr(connector.app, "topics") == topic_name @@ -88,7 +98,7 @@ def test_connector_config_parsing( app=KafkaConnectorConfig( **{**connector_config.model_dump(), "topics.regex": topic_pattern} ), - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, ) assert getattr(connector.app, "topics.regex") == topic_pattern @@ -105,7 +115,7 @@ def test_from_section_parsing_input_topic( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={ topic1: FromTopic(type=InputTopicTypes.INPUT), @@ -131,7 +141,7 @@ def test_from_section_parsing_input_pattern( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={topic_pattern: FromTopic(type=InputTopicTypes.PATTERN)} ), @@ -177,18 +187,18 @@ def test_destroy( def test_reset_when_dry_run_is_true( self, connector: KafkaSinkConnector, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, ): dry_run = True connector.reset(dry_run=dry_run) - dry_run_handler.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_called_once() def test_reset_when_dry_run_is_false( self, connector: KafkaSinkConnector, + dry_run_handler_mock: MagicMock, helm_mock: MagicMock, - dry_run_handler: MagicMock, mocker: MockerFixture, ): mock_delete_topics = mocker.patch.object( @@ -197,71 +207,78 @@ def test_reset_when_dry_run_is_false( mock_clean_connector = mocker.patch.object( connector.handlers.connector_handler, "clean_connector" ) + mock_resetter_reset = mocker.spy(connector._resetter, "reset") + mock = mocker.MagicMock() mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") dry_run = False connector.reset(dry_run=dry_run) - - assert mock.mock_calls == [ - mocker.call.helm.add_repo( - "bakdata-kafka-connect-resetter", - "https://bakdata.github.io/kafka-connect-resetter/", - RepoAuthFlags(), - ), - mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=dry_run, - ), - mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - namespace="test-namespace", - chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run=dry_run, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, + mock_resetter_reset.assert_called_once_with(dry_run) + + mock.assert_has_calls( + [ + mocker.call.helm.add_repo( + "bakdata-kafka-connect-resetter", + "https://bakdata.github.io/kafka-connect-resetter/", + RepoAuthFlags(), ), - values={ - "connectorType": "sink", - "config": { - "brokers": "broker:9092", - "connector": CONNECTOR_FULL_NAME, - "deleteConsumerGroup": False, + mocker.call.helm.uninstall( + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + mocker.call.helm.upgrade_install( + CONNECTOR_CLEAN_RELEASE_NAME, + "bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run, + RESETTER_NAMESPACE, + { + "nameOverride": CONNECTOR_CLEAN_FULL_NAME, + "connectorType": CONNECTOR_TYPE, + "config": { + "brokers": "broker:9092", + "connector": CONNECTOR_FULL_NAME, + "deleteConsumerGroup": False, + }, }, - "nameOverride": CONNECTOR_CLEAN_FULL_NAME, - }, - ), - mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=dry_run, - ), - ] + HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), + ), + mocker.call.helm.uninstall( + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, + ), + ANY, # __bool__ + ANY, # __str__ + ] + ) - dry_run_handler.print_helm_diff.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() mock_delete_topics.assert_not_called() def test_clean_when_dry_run_is_true( self, connector: KafkaSinkConnector, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, ): dry_run = True connector.clean(dry_run=dry_run) - dry_run_handler.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_called_once() def test_clean_when_dry_run_is_false( self, connector: KafkaSinkConnector, - config: KpopsConfig, - handlers: ComponentHandlers, helm_mock: MagicMock, log_info_mock: MagicMock, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, mocker: MockerFixture, ): mock_delete_topics = mocker.patch.object( @@ -301,43 +318,47 @@ def test_clean_when_dry_run_is_false( RepoAuthFlags(), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=dry_run, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - namespace="test-namespace", - chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run=dry_run, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - values={ - "connectorType": "sink", + CONNECTOR_CLEAN_RELEASE_NAME, + "bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run, + RESETTER_NAMESPACE, + { + "nameOverride": CONNECTOR_CLEAN_FULL_NAME, + "connectorType": CONNECTOR_TYPE, "config": { "brokers": "broker:9092", "connector": CONNECTOR_FULL_NAME, "deleteConsumerGroup": True, }, - "nameOverride": CONNECTOR_CLEAN_FULL_NAME, }, + HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=dry_run, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ ] - dry_run_handler.print_helm_diff.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() def test_clean_without_to_when_dry_run_is_true( self, config: KpopsConfig, handlers: ComponentHandlers, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, connector_config: KafkaConnectorConfig, ): connector = KafkaSinkConnector( @@ -345,19 +366,19 @@ def test_clean_without_to_when_dry_run_is_true( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, ) dry_run = True connector.clean(dry_run) - dry_run_handler.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_called_once() def test_clean_without_to_when_dry_run_is_false( self, config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, mocker: MockerFixture, connector_config: KafkaConnectorConfig, ): @@ -366,7 +387,7 @@ def test_clean_without_to_when_dry_run_is_false( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, ) mock_delete_topics = mocker.patch.object( @@ -395,36 +416,40 @@ def test_clean_without_to_when_dry_run_is_false( ), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=dry_run, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - namespace="test-namespace", - chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run=dry_run, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - values={ - "connectorType": "sink", + CONNECTOR_CLEAN_RELEASE_NAME, + "bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run, + RESETTER_NAMESPACE, + { + "nameOverride": CONNECTOR_CLEAN_FULL_NAME, + "connectorType": CONNECTOR_TYPE, "config": { "brokers": "broker:9092", "connector": CONNECTOR_FULL_NAME, "deleteConsumerGroup": True, }, - "nameOverride": CONNECTOR_CLEAN_FULL_NAME, }, + HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=dry_run, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ ] - dry_run_handler.print_helm_diff.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() mock_delete_topics.assert_not_called() diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index a34efc364..31511e81f 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -1,4 +1,4 @@ -from unittest.mock import MagicMock +from unittest.mock import ANY, MagicMock import pytest from pytest_mock import MockerFixture @@ -8,8 +8,14 @@ HelmUpgradeInstallFlags, RepoAuthFlags, ) -from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig -from kpops.components.base_components.kafka_connector import KafkaSourceConnector +from kpops.component_handlers.kafka_connect.model import ( + KafkaConnectorConfig, + KafkaConnectorType, +) +from kpops.components.base_components.kafka_connector import ( + KafkaConnectorResetter, + KafkaSourceConnector, +) from kpops.components.base_components.models.from_section import ( FromSection, FromTopic, @@ -28,10 +34,13 @@ CONNECTOR_CLEAN_RELEASE_NAME, CONNECTOR_FULL_NAME, CONNECTOR_NAME, + RESETTER_NAMESPACE, TestKafkaConnector, ) +CONNECTOR_TYPE = KafkaConnectorType.SOURCE.value CLEAN_SUFFIX = "-clean" +OFFSETS_TOPIC = "kafka-connect-offsets" class TestKafkaSourceConnector(TestKafkaConnector): @@ -47,7 +56,7 @@ def connector( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, to=ToSection( topics={ TopicName("${output_topic_name}"): TopicConfig( @@ -55,9 +64,15 @@ def connector( ), } ), - offset_topic="kafka-connect-offsets", + offset_topic=OFFSETS_TOPIC, ) + def test_resetter_release_name(self, connector: KafkaSourceConnector): + assert connector.app.name == CONNECTOR_FULL_NAME + resetter = connector._resetter + assert isinstance(resetter, KafkaConnectorResetter) + assert connector._resetter.helm_release_name == CONNECTOR_CLEAN_RELEASE_NAME + def test_from_section_raises_exception( self, config: KpopsConfig, @@ -70,7 +85,7 @@ def test_from_section_raises_exception( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", + resetter_namespace=RESETTER_NAMESPACE, from_=FromSection( # pyright: ignore[reportGeneralTypeIssues] wrong diagnostic when using TopicName as topics key type topics={ TopicName("connector-topic"): FromTopic( @@ -107,7 +122,7 @@ def test_destroy( connector: KafkaSourceConnector, mocker: MockerFixture, ): - ENV["KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC"] = "kafka-connect-offsets" + ENV["KPOPS_KAFKA_CONNECT_RESETTER_OFFSET_TOPIC"] = OFFSETS_TOPIC assert connector.handlers.connector_handler mock_destroy_connector = mocker.patch.object( @@ -123,18 +138,18 @@ def test_destroy( def test_reset_when_dry_run_is_true( self, connector: KafkaSourceConnector, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, ): assert connector.handlers.connector_handler connector.reset(dry_run=True) - dry_run_handler.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_called_once() def test_reset_when_dry_run_is_false( self, connector: KafkaSourceConnector, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, helm_mock: MagicMock, mocker: MockerFixture, ): @@ -159,55 +174,59 @@ def test_reset_when_dry_run_is_false( RepoAuthFlags(), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=False, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + False, ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - namespace="test-namespace", - chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run=False, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - values={ - "connectorType": "source", + CONNECTOR_CLEAN_RELEASE_NAME, + "bakdata-kafka-connect-resetter/kafka-connect-resetter", + False, + RESETTER_NAMESPACE, + { + "connectorType": CONNECTOR_TYPE, "config": { "brokers": "broker:9092", "connector": CONNECTOR_FULL_NAME, - "offsetTopic": "kafka-connect-offsets", + "offsetTopic": OFFSETS_TOPIC, }, "nameOverride": CONNECTOR_CLEAN_FULL_NAME, }, + HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=False, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + False, ), + ANY, # __bool__ + ANY, # __str__ ] mock_delete_topics.assert_not_called() - dry_run_handler.print_helm_diff.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() def test_clean_when_dry_run_is_true( self, connector: KafkaSourceConnector, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, ): assert connector.handlers.connector_handler connector.clean(dry_run=True) - dry_run_handler.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_called_once() def test_clean_when_dry_run_is_false( self, connector: KafkaSourceConnector, helm_mock: MagicMock, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, mocker: MockerFixture, ): assert connector.handlers.connector_handler @@ -224,55 +243,60 @@ def test_clean_when_dry_run_is_false( mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") - connector.clean(dry_run=False) + dry_run = False + connector.clean(dry_run) assert mock.mock_calls == [ - mocker.call.mock_delete_topics(connector.to, dry_run=False), + mocker.call.mock_delete_topics(connector.to, dry_run=dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", RepoAuthFlags(), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=False, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - namespace="test-namespace", - chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run=False, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - values={ - "connectorType": "source", + CONNECTOR_CLEAN_RELEASE_NAME, + "bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run, + RESETTER_NAMESPACE, + { + "nameOverride": CONNECTOR_CLEAN_FULL_NAME, + "connectorType": CONNECTOR_TYPE, "config": { "brokers": "broker:9092", "connector": CONNECTOR_FULL_NAME, - "offsetTopic": "kafka-connect-offsets", + "offsetTopic": OFFSETS_TOPIC, }, - "nameOverride": CONNECTOR_CLEAN_FULL_NAME, }, + HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=False, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ ] - dry_run_handler.print_helm_diff.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() def test_clean_without_to_when_dry_run_is_false( self, config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, mocker: MockerFixture, connector_config: KafkaConnectorConfig, ): @@ -281,8 +305,8 @@ def test_clean_without_to_when_dry_run_is_false( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", - offset_topic="kafka-connect-offsets", + resetter_namespace=RESETTER_NAMESPACE, + offset_topic=OFFSETS_TOPIC, ) assert connector.to is None @@ -300,7 +324,8 @@ def test_clean_without_to_when_dry_run_is_false( mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") - connector.clean(dry_run=False) + dry_run = False + connector.clean(dry_run) assert mock.mock_calls == [ mocker.call.helm.add_repo( @@ -309,45 +334,49 @@ def test_clean_without_to_when_dry_run_is_false( RepoAuthFlags(), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=False, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - namespace="test-namespace", - chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", - dry_run=False, - flags=HelmUpgradeInstallFlags( - version="1.0.4", - wait=True, - wait_for_jobs=True, - ), - values={ - "connectorType": "source", + CONNECTOR_CLEAN_RELEASE_NAME, + "bakdata-kafka-connect-resetter/kafka-connect-resetter", + dry_run, + RESETTER_NAMESPACE, + { + "nameOverride": CONNECTOR_CLEAN_FULL_NAME, + "connectorType": CONNECTOR_TYPE, "config": { "brokers": "broker:9092", "connector": CONNECTOR_FULL_NAME, - "offsetTopic": "kafka-connect-offsets", + "offsetTopic": OFFSETS_TOPIC, }, - "nameOverride": CONNECTOR_CLEAN_FULL_NAME, }, + HelmUpgradeInstallFlags( + version="1.0.4", + wait=True, + wait_for_jobs=True, + ), ), mocker.call.helm.uninstall( - namespace="test-namespace", - release_name=CONNECTOR_CLEAN_RELEASE_NAME, - dry_run=False, + RESETTER_NAMESPACE, + CONNECTOR_CLEAN_RELEASE_NAME, + dry_run, ), + ANY, # __bool__ + ANY, # __str__ ] mock_delete_topics.assert_not_called() - dry_run_handler.print_helm_diff.assert_not_called() + dry_run_handler_mock.print_helm_diff.assert_not_called() def test_clean_without_to_when_dry_run_is_true( self, config: KpopsConfig, handlers: ComponentHandlers, - dry_run_handler: MagicMock, + dry_run_handler_mock: MagicMock, connector_config: KafkaConnectorConfig, ): connector = KafkaSourceConnector( @@ -355,8 +384,8 @@ def test_clean_without_to_when_dry_run_is_true( config=config, handlers=handlers, app=connector_config, - namespace="test-namespace", - offset_topic="kafka-connect-offsets", + resetter_namespace=RESETTER_NAMESPACE, + offset_topic=OFFSETS_TOPIC, ) assert connector.to is None @@ -364,4 +393,4 @@ def test_clean_without_to_when_dry_run_is_true( connector.clean(dry_run=True) - dry_run_handler.print_helm_diff.assert_called_once() + dry_run_handler_mock.print_helm_diff.assert_called_once() diff --git a/tests/pipeline/resources/defaults.yaml b/tests/pipeline/resources/defaults.yaml index 101e3e175..810e6b5f8 100644 --- a/tests/pipeline/resources/defaults.yaml +++ b/tests/pipeline/resources/defaults.yaml @@ -1,12 +1,14 @@ +pipeline-component: + name: ${component.type} + kubernetes-app: - name: "${component.type}" namespace: example-namespace kafka-app: app: streams: - brokers: "${config.kafka_brokers}" - schema_registry_url: "${config.schema_registry.url}" + brokers: ${config.kafka_brokers} + schema_registry_url: ${config.schema_registry.url} version: "2.4.2" producer-app: {} # inherits from kafka-app @@ -49,7 +51,7 @@ converter: enabled: true consumerGroup: converter-${output_topic_name} maxReplicas: 1 - lagThreshold: "10000" + lagThreshold: 10000 to: topics: ${output_topic_name}: @@ -72,7 +74,7 @@ filter: autoscaling: enabled: true maxReplicas: 1 - lagThreshold: "10000" + lagThreshold: 10000 consumerGroup: filter-${output_topic_name} topics: - "${output_topic_name}" @@ -91,7 +93,7 @@ should-inflate: autoscaling: enabled: true maxReplicas: 1 - lagThreshold: "10000" + lagThreshold: 10000 consumerGroup: filter-${output_topic_name} topics: - "${output_topic_name}" @@ -103,9 +105,7 @@ should-inflate: configs: retention.ms: "-1" -kafka-connector: - name: "sink-connector" - namespace: "example-namespace" +kafka-sink-connector: app: batch.size: "2000" behavior.on.malformed.documents: "warn" diff --git a/tests/pipeline/resources/dotenv/config.yaml b/tests/pipeline/resources/dotenv/config.yaml index 66fb3e410..3abfdffd4 100644 --- a/tests/pipeline/resources/dotenv/config.yaml +++ b/tests/pipeline/resources/dotenv/config.yaml @@ -1,7 +1,4 @@ defaults_path: ../defaults.yaml -topic_name_config: - default_error_topic_name: "${component_name}-dead-letter-topic" - default_output_topic_name: "${component_name}-test-topic" kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" kafka_connect: url: "http://localhost:8083" diff --git a/tests/pipeline/resources/env-specific-config-only/config_production.yaml b/tests/pipeline/resources/env-specific-config-only/config_production.yaml index 74910f62d..2e40128d4 100644 --- a/tests/pipeline/resources/env-specific-config-only/config_production.yaml +++ b/tests/pipeline/resources/env-specific-config-only/config_production.yaml @@ -1,7 +1,4 @@ defaults_path: ../no-topics-defaults -topic_name_config: - default_error_topic_name: "${component_name}-dead-letter-topic" - default_output_topic_name: "${component_name}-test-topic" kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" kafka_connect: url: "http://localhost:8083" diff --git a/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml b/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml index 02a28015a..fc012737a 100644 --- a/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml +++ b/tests/pipeline/resources/kafka-connect-sink/pipeline.yaml @@ -12,7 +12,6 @@ type: output - type: kafka-sink-connector - namespace: example-namespace name: es-sink-connector app: connector.class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector diff --git a/tests/pipeline/resources/multi-config/config.yaml b/tests/pipeline/resources/multi-config/config.yaml index 74910f62d..2e40128d4 100644 --- a/tests/pipeline/resources/multi-config/config.yaml +++ b/tests/pipeline/resources/multi-config/config.yaml @@ -1,7 +1,4 @@ defaults_path: ../no-topics-defaults -topic_name_config: - default_error_topic_name: "${component_name}-dead-letter-topic" - default_output_topic_name: "${component_name}-test-topic" kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" kafka_connect: url: "http://localhost:8083" diff --git a/tests/pipeline/snapshots/snap_test_example.py b/tests/pipeline/snapshots/snap_test_example.py index a88a7ee4a..77ba66496 100644 --- a/tests/pipeline/snapshots/snap_test_example.py +++ b/tests/pipeline/snapshots/snap_test_example.py @@ -305,6 +305,28 @@ 'version': '2.9.0' }, { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'postgresql-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'postgresql-connector', + 'namespace': '${NAMESPACE}', + 'prefix': '', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, 'app': { 'auto.create': True, 'connection.ds.pool.size': 5, @@ -330,18 +352,9 @@ 'value.converter.schema.registry.url': 'http://k8kafka-cp-schema-registry.${NAMESPACE}.svc.cluster.local:8081' }, 'name': 'postgresql-connector', - 'namespace': '${NAMESPACE}', 'prefix': '', - 'repo_config': { - 'repo_auth_flags': { - 'insecure_skip_tls_verify': False - }, - 'repository_name': 'bakdata-kafka-connect-resetter', - 'url': 'https://bakdata.github.io/kafka-connect-resetter/' - }, 'resetter_values': { }, - 'type': 'kafka-sink-connector', - 'version': '1.0.4' + 'type': 'kafka-sink-connector' } ] diff --git a/tests/pipeline/snapshots/snap_test_generate.py b/tests/pipeline/snapshots/snap_test_generate.py index f6d75f3e0..436d7e9a2 100644 --- a/tests/pipeline/snapshots/snap_test_generate.py +++ b/tests/pipeline/snapshots/snap_test_generate.py @@ -292,6 +292,44 @@ 'version': '2.4.2' }, { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'resources-pipeline-with-inflate-should-inflate-inflated-sink-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'should-inflate-inflated-sink-connector', + 'namespace': 'example-namespace', + 'prefix': 'resources-pipeline-with-inflate-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'to': { + 'models': { + }, + 'topics': { + 'kafka-sink-connector': { + 'configs': { + }, + 'type': 'output' + }, + 'should-inflate-inflated-sink-connector': { + 'configs': { + }, + 'role': 'test' + } + } + }, + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, 'app': { 'batch.size': '2000', 'behavior.on.malformed.documents': 'warn', @@ -308,15 +346,7 @@ 'transforms.changeTopic.replacement': 'resources-pipeline-with-inflate-should-inflate-index-v1' }, 'name': 'should-inflate-inflated-sink-connector', - 'namespace': 'example-namespace', 'prefix': 'resources-pipeline-with-inflate-', - 'repo_config': { - 'repo_auth_flags': { - 'insecure_skip_tls_verify': False - }, - 'repository_name': 'bakdata-kafka-connect-resetter', - 'url': 'https://bakdata.github.io/kafka-connect-resetter/' - }, 'resetter_values': { }, 'to': { @@ -335,8 +365,7 @@ } } }, - 'type': 'kafka-sink-connector', - 'version': '1.0.4' + 'type': 'kafka-sink-connector' }, { 'app': { @@ -446,6 +475,28 @@ 'version': '2.4.2' }, { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'resources-kafka-connect-sink-es-sink-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'es-sink-connector', + 'namespace': 'example-namespace', + 'prefix': 'resources-kafka-connect-sink-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, 'app': { 'batch.size': '2000', 'behavior.on.malformed.documents': 'warn', @@ -461,19 +512,10 @@ 'topics': 'example-output' }, 'name': 'es-sink-connector', - 'namespace': 'example-namespace', 'prefix': 'resources-kafka-connect-sink-', - 'repo_config': { - 'repo_auth_flags': { - 'insecure_skip_tls_verify': False - }, - 'repository_name': 'bakdata-kafka-connect-resetter', - 'url': 'https://bakdata.github.io/kafka-connect-resetter/' - }, 'resetter_values': { }, - 'type': 'kafka-sink-connector', - 'version': '1.0.4' + 'type': 'kafka-sink-connector' } ] @@ -1245,6 +1287,44 @@ 'version': '2.4.2' }, { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'resources-read-from-component-inflate-step-inflated-sink-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'inflate-step-inflated-sink-connector', + 'namespace': 'example-namespace', + 'prefix': 'resources-read-from-component-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'to': { + 'models': { + }, + 'topics': { + 'inflate-step-inflated-sink-connector': { + 'configs': { + }, + 'role': 'test' + }, + 'kafka-sink-connector': { + 'configs': { + }, + 'type': 'output' + } + } + }, + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, 'app': { 'batch.size': '2000', 'behavior.on.malformed.documents': 'warn', @@ -1261,15 +1341,7 @@ 'transforms.changeTopic.replacement': 'resources-read-from-component-inflate-step-index-v1' }, 'name': 'inflate-step-inflated-sink-connector', - 'namespace': 'example-namespace', 'prefix': 'resources-read-from-component-', - 'repo_config': { - 'repo_auth_flags': { - 'insecure_skip_tls_verify': False - }, - 'repository_name': 'bakdata-kafka-connect-resetter', - 'url': 'https://bakdata.github.io/kafka-connect-resetter/' - }, 'resetter_values': { }, 'to': { @@ -1288,8 +1360,7 @@ } } }, - 'type': 'kafka-sink-connector', - 'version': '1.0.4' + 'type': 'kafka-sink-connector' }, { 'app': { @@ -1403,6 +1474,44 @@ 'version': '2.4.2' }, { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'resources-read-from-component-inflate-step-without-prefix-inflated-sink-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'inflate-step-without-prefix-inflated-sink-connector', + 'namespace': 'example-namespace', + 'prefix': 'resources-read-from-component-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'to': { + 'models': { + }, + 'topics': { + 'inflate-step-without-prefix-inflated-sink-connector': { + 'configs': { + }, + 'role': 'test' + }, + 'kafka-sink-connector': { + 'configs': { + }, + 'type': 'output' + } + } + }, + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, 'app': { 'batch.size': '2000', 'behavior.on.malformed.documents': 'warn', @@ -1419,15 +1528,7 @@ 'transforms.changeTopic.replacement': 'resources-read-from-component-inflate-step-without-prefix-index-v1' }, 'name': 'inflate-step-without-prefix-inflated-sink-connector', - 'namespace': 'example-namespace', 'prefix': 'resources-read-from-component-', - 'repo_config': { - 'repo_auth_flags': { - 'insecure_skip_tls_verify': False - }, - 'repository_name': 'bakdata-kafka-connect-resetter', - 'url': 'https://bakdata.github.io/kafka-connect-resetter/' - }, 'resetter_values': { }, 'to': { @@ -1446,8 +1547,7 @@ } } }, - 'type': 'kafka-sink-connector', - 'version': '1.0.4' + 'type': 'kafka-sink-connector' }, { 'app': { @@ -2212,6 +2312,28 @@ 'version': '2.9.0' }, { + '_resetter': { + 'app': { + 'config': { + 'brokers': 'http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092', + 'connector': 'resources-kafka-connect-sink-es-sink-connector' + }, + 'connectorType': 'sink' + }, + 'name': 'es-sink-connector', + 'namespace': 'development-namespace', + 'prefix': 'resources-kafka-connect-sink-', + 'repo_config': { + 'repo_auth_flags': { + 'insecure_skip_tls_verify': False + }, + 'repository_name': 'bakdata-kafka-connect-resetter', + 'url': 'https://bakdata.github.io/kafka-connect-resetter/' + }, + 'suffix': '-clean', + 'type': 'kafka-connector-resetter', + 'version': '1.0.4' + }, 'app': { 'batch.size': '2000', 'behavior.on.malformed.documents': 'warn', @@ -2227,18 +2349,9 @@ 'topics': 'example-output' }, 'name': 'es-sink-connector', - 'namespace': 'example-namespace', 'prefix': 'resources-kafka-connect-sink-', - 'repo_config': { - 'repo_auth_flags': { - 'insecure_skip_tls_verify': False - }, - 'repository_name': 'bakdata-kafka-connect-resetter', - 'url': 'https://bakdata.github.io/kafka-connect-resetter/' - }, 'resetter_values': { }, - 'type': 'kafka-sink-connector', - 'version': '1.0.4' + 'type': 'kafka-sink-connector' } ] diff --git a/tests/pipeline/test_components/components.py b/tests/pipeline/test_components/components.py index 0382de5b2..7964b2102 100644 --- a/tests/pipeline/test_components/components.py +++ b/tests/pipeline/test_components/components.py @@ -46,7 +46,6 @@ def inflate(self) -> list[PipelineComponent]: name=f"{self.name}-inflated-sink-connector", config=self.config, handlers=self.handlers, - namespace="example-namespace", app={ # type: ignore[reportGeneralTypeIssues], required `connector.class` comes from defaults during enrichment "topics": topic_name, "transforms.changeTopic.replacement": f"{topic_name}-index-v1", diff --git a/tests/pipeline/test_components_without_schema_handler/components.py b/tests/pipeline/test_components_without_schema_handler/components.py index 686aac26c..c87c668a0 100644 --- a/tests/pipeline/test_components_without_schema_handler/components.py +++ b/tests/pipeline/test_components_without_schema_handler/components.py @@ -31,7 +31,6 @@ def inflate(self) -> list[PipelineComponent]: name="sink-connector", config=self.config, handlers=self.handlers, - namespace="example-namespace", app=KafkaConnectorConfig( **{ "topics": topic_name,