diff --git a/config.yaml b/config.yaml index 46d0cf8b3..8fe8bb213 100644 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,2 @@ environment: development -brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" -kafka_connect_host: "http://localhost:8083" -kafka_rest_host: "http://localhost:8082" -schema_registry_url: "http://localhost:8081" +kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" diff --git a/docs/docs/developer/auto-generation.md b/docs/docs/developer/auto-generation.md index 249f52b77..b87cbcad0 100644 --- a/docs/docs/developer/auto-generation.md +++ b/docs/docs/developer/auto-generation.md @@ -10,7 +10,7 @@ Auto generation happens mostly with [`pre-commit`](https://pre-commit.com/) hook - `cli_env_vars.env` -- All CLI environment variables in a `dotenv` file. - `cli_env_vars.md` -- All CLI environment variables in a table. -- `config_env_vars.env` -- Almost all pipeline config environment variables in a `dotenv` file. The script checks for each field in [`PipelineConfig`](https://github.com/bakdata/kpops/blob/main/kpops/cli/pipeline_config.py) whether it has an `env` attribute defined. The script is currently unable to visit the classes of fields like `topic_name_config`, hence any environment variables defined there would remain unknown to it. +- `config_env_vars.env` -- Almost all pipeline config environment variables in a `dotenv` file. The script checks for each field in [`PipelineConfig`](https://github.com/bakdata/kpops/blob/main/kpops/cli/kpops_config.py) whether it has an `env` attribute defined. The script is currently unable to visit the classes of fields like `topic_name_config`, hence any environment variables defined there would remain unknown to it. - `config_env_vars.env` -- Almost all pipeline config environment variables in a table. - `variable_substitution.yaml` -- A copy of `./tests/pipeline/resources/component-type-substitution/pipeline.yaml` used as an example of substitution. diff --git a/docs/docs/resources/pipeline-components/kafka-app.yaml b/docs/docs/resources/pipeline-components/kafka-app.yaml index 6d8045ad5..cdc49ef28 100644 --- a/docs/docs/resources/pipeline-components/kafka-app.yaml +++ b/docs/docs/resources/pipeline-components/kafka-app.yaml @@ -49,7 +49,7 @@ # add the key-value pairs they need. app: # required streams: # required - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app diff --git a/docs/docs/resources/pipeline-components/pipeline.yaml b/docs/docs/resources/pipeline-components/pipeline.yaml index 27c5d45c1..eb7930376 100644 --- a/docs/docs/resources/pipeline-components/pipeline.yaml +++ b/docs/docs/resources/pipeline-components/pipeline.yaml @@ -49,7 +49,7 @@ # add the key-value pairs they need. app: # required streams: # required - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app @@ -275,7 +275,7 @@ # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: @@ -346,7 +346,7 @@ app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 diff --git a/docs/docs/resources/pipeline-components/producer-app.yaml b/docs/docs/resources/pipeline-components/producer-app.yaml index 7a01ad24b..5be3551d8 100644 --- a/docs/docs/resources/pipeline-components/producer-app.yaml +++ b/docs/docs/resources/pipeline-components/producer-app.yaml @@ -32,7 +32,7 @@ # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: diff --git a/docs/docs/resources/pipeline-components/sections/app-kafka-app.yaml b/docs/docs/resources/pipeline-components/sections/app-kafka-app.yaml index 991e862e0..73b70c59e 100644 --- a/docs/docs/resources/pipeline-components/sections/app-kafka-app.yaml +++ b/docs/docs/resources/pipeline-components/sections/app-kafka-app.yaml @@ -2,7 +2,7 @@ # add the key-value pairs they need. app: # required streams: # required - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app diff --git a/docs/docs/resources/pipeline-components/sections/app-producer-app.yaml b/docs/docs/resources/pipeline-components/sections/app-producer-app.yaml index 5cd9b000b..0cbe04ded 100644 --- a/docs/docs/resources/pipeline-components/sections/app-producer-app.yaml +++ b/docs/docs/resources/pipeline-components/sections/app-producer-app.yaml @@ -2,7 +2,7 @@ # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: diff --git a/docs/docs/resources/pipeline-components/sections/app-streams-app.yaml b/docs/docs/resources/pipeline-components/sections/app-streams-app.yaml index 44f6604aa..1c5f0849f 100644 --- a/docs/docs/resources/pipeline-components/sections/app-streams-app.yaml +++ b/docs/docs/resources/pipeline-components/sections/app-streams-app.yaml @@ -4,7 +4,7 @@ app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 diff --git a/docs/docs/resources/pipeline-components/streams-app.yaml b/docs/docs/resources/pipeline-components/streams-app.yaml index 0dde5be5c..f77edf80c 100644 --- a/docs/docs/resources/pipeline-components/streams-app.yaml +++ b/docs/docs/resources/pipeline-components/streams-app.yaml @@ -51,7 +51,7 @@ app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 diff --git a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml index e0af3b7a7..bd6c9e2d9 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-kafka-app.yaml @@ -7,7 +7,7 @@ kafka-app: # add the key-value pairs they need. app: # required streams: # required - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app diff --git a/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml index 1d81f5ced..bfa5521c4 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-producer-app.yaml @@ -10,7 +10,7 @@ producer-app: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: diff --git a/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml b/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml index 83ff13f14..ae1adab98 100644 --- a/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults-streams-app.yaml @@ -9,7 +9,7 @@ streams-app: app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 diff --git a/docs/docs/resources/pipeline-defaults/defaults.yaml b/docs/docs/resources/pipeline-defaults/defaults.yaml index e74272bdc..3a43d81e7 100644 --- a/docs/docs/resources/pipeline-defaults/defaults.yaml +++ b/docs/docs/resources/pipeline-defaults/defaults.yaml @@ -7,7 +7,7 @@ kafka-app: # add the key-value pairs they need. app: # required streams: # required - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} nameOverride: override-with-this-name # kafka-app-specific imageTag: "1.0.0" # Example values that are shared between streams-app and producer-app @@ -170,7 +170,7 @@ producer-app: # https://github.com/bakdata/streams-bootstrap/tree/master/charts/producer-app app: # required streams: # required, producer-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} outputTopic: output_topic extraOutputTopics: @@ -188,7 +188,7 @@ streams-app: app: # required # Streams Bootstrap streams section streams: # required, streams-app-specific - brokers: ${brokers} # required + brokers: ${kafka_brokers} # required schemaRegistryUrl: ${schema_registry_url} inputTopics: - topic1 diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index 308fb6334..00bef6a4c 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -9,18 +9,18 @@ # Suffix your environment files with this value (e.g. # defaults_development.yaml for environment=development). KPOPS_ENVIRONMENT # No default value, required -# brokers +# kafka_brokers # The comma separated Kafka brokers address. KPOPS_KAFKA_BROKERS # No default value, required -# schema_registry_url +# url # Address of the Schema Registry. -KPOPS_SCHEMA_REGISTRY_URL # No default value, not required -# kafka_rest_host +KPOPS_SCHEMA_REGISTRY_URL=http://localhost:8081 +# url # Address of the Kafka REST Proxy. -KPOPS_REST_PROXY_HOST # No default value, not required -# kafka_connect_host +KPOPS_KAFKA_REST_URL=http://localhost:8082 +# url # Address of Kafka Connect. -KPOPS_CONNECT_HOST # No default value, not required +KPOPS_KAFKA_CONNECT_URL=http://localhost:8083 # timeout # The timeout in seconds that specifies when actions like deletion or # deploy timeout. diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 2928f2ccd..2419de11d 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -1,11 +1,11 @@ These variables are a lower priority alternative to the settings in `config.yaml`. Variables marked as required can instead be set in the pipeline config. -| Name |Default Value|Required| Description | Setting name | -|-------------------------|-------------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------| -|KPOPS_ENVIRONMENT | |True |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|environment | -|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |brokers | -|KPOPS_SCHEMA_REGISTRY_URL| |False |Address of the Schema Registry. |schema_registry_url| -|KPOPS_REST_PROXY_HOST | |False |Address of the Kafka REST Proxy. |kafka_rest_host | -|KPOPS_CONNECT_HOST | |False |Address of Kafka Connect. |kafka_connect_host | -|KPOPS_TIMEOUT | 300|False |The timeout in seconds that specifies when actions like deletion or deploy timeout. |timeout | -|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs | +| Name | Default Value |Required| Description | Setting name | +|-------------------------|---------------------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------| +|KPOPS_ENVIRONMENT | |True |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|environment | +|KPOPS_KAFKA_BROKERS | |True |The comma separated Kafka brokers address. |kafka_brokers | +|KPOPS_SCHEMA_REGISTRY_URL|http://localhost:8081|False |Address of the Schema Registry. |url | +|KPOPS_KAFKA_REST_URL |http://localhost:8082|False |Address of the Kafka REST Proxy. |url | +|KPOPS_KAFKA_CONNECT_URL |http://localhost:8083|False |Address of Kafka Connect. |url | +|KPOPS_TIMEOUT |300 |False |The timeout in seconds that specifies when actions like deletion or deploy timeout. |timeout | +|KPOPS_RETAIN_CLEAN_JOBS |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs| diff --git a/docs/docs/schema/config.json b/docs/docs/schema/config.json index b77b4e850..a0841dae3 100644 --- a/docs/docs/schema/config.json +++ b/docs/docs/schema/config.json @@ -1,5 +1,5 @@ { - "$ref": "#/definitions/PipelineConfig", + "$ref": "#/definitions/KpopsConfig", "definitions": { "HelmConfig": { "description": "Global Helm configuration", @@ -41,25 +41,57 @@ "title": "HelmDiffConfig", "type": "object" }, - "PipelineConfig": { + "KafkaConnectConfig": { "additionalProperties": false, - "description": "Pipeline configuration unrelated to the components.", + "description": "Configuration for Kafka Connect.", "properties": { - "brokers": { - "description": "The comma separated Kafka brokers address.", - "env": "KPOPS_KAFKA_BROKERS", + "url": { + "default": "http://localhost:8083", + "description": "Address of Kafka Connect.", + "env": "KPOPS_KAFKA_CONNECT_URL", "env_names": [ - "kpops_kafka_brokers" + "kpops_kafka_connect_url" ], - "example": "broker1:9092,broker2:9092,broker3:9092", - "title": "Brokers", + "format": "uri", + "maxLength": 65536, + "minLength": 1, + "title": "Url", "type": "string" - }, + } + }, + "title": "KafkaConnectConfig", + "type": "object" + }, + "KafkaRestConfig": { + "additionalProperties": false, + "description": "Configuration for Kafka REST Proxy.", + "properties": { + "url": { + "default": "http://localhost:8082", + "description": "Address of the Kafka REST Proxy.", + "env": "KPOPS_KAFKA_REST_URL", + "env_names": [ + "kpops_kafka_rest_url" + ], + "format": "uri", + "maxLength": 65536, + "minLength": 1, + "title": "Url", + "type": "string" + } + }, + "title": "KafkaRestConfig", + "type": "object" + }, + "KpopsConfig": { + "additionalProperties": false, + "description": "Pipeline configuration unrelated to the components.", + "properties": { "create_namespace": { "default": false, "description": "Flag for `helm upgrade --install`. Create the release namespace if not present.", "env_names": [ - "create_namespace" + "kpops_create_namespace" ], "title": "Create Namespace", "type": "boolean" @@ -68,7 +100,7 @@ "default": "defaults", "description": "The name of the defaults file and the prefix of the defaults environment file.", "env_names": [ - "defaults_filename_prefix" + "kpops_defaults_filename_prefix" ], "title": "Defaults Filename Prefix", "type": "string" @@ -77,7 +109,7 @@ "default": ".", "description": "The path to the folder containing the defaults.yaml file and the environment defaults files. Paths can either be absolute or relative to `config.yaml`", "env_names": [ - "defaults_path" + "kpops_defaults_path" ], "example": "defaults", "format": "path", @@ -107,7 +139,7 @@ }, "description": "Global flags for Helm.", "env_names": [ - "helm_config" + "kpops_helm_config" ], "title": "Helm Config" }, @@ -122,29 +154,49 @@ }, "description": "Configure Helm Diff.", "env_names": [ - "helm_diff_config" + "kpops_helm_diff_config" ], "title": "Helm Diff Config" }, - "kafka_connect_host": { - "description": "Address of Kafka Connect.", - "env": "KPOPS_CONNECT_HOST", + "kafka_brokers": { + "description": "The comma separated Kafka brokers address.", + "env": "KPOPS_KAFKA_BROKERS", "env_names": [ - "kpops_connect_host" + "kpops_kafka_brokers" ], - "example": "http://localhost:8083", - "title": "Kafka Connect Host", + "example": "broker1:9092,broker2:9092,broker3:9092", + "title": "Kafka Brokers", "type": "string" }, - "kafka_rest_host": { - "description": "Address of the Kafka REST Proxy.", - "env": "KPOPS_REST_PROXY_HOST", + "kafka_connect": { + "allOf": [ + { + "$ref": "#/definitions/KafkaConnectConfig" + } + ], + "default": { + "url": "http://localhost:8083" + }, + "description": "Configuration for Kafka Connect.", "env_names": [ - "kpops_rest_proxy_host" + "kpops_kafka_connect" ], - "example": "http://localhost:8082", - "title": "Kafka Rest Host", - "type": "string" + "title": "Kafka Connect" + }, + "kafka_rest": { + "allOf": [ + { + "$ref": "#/definitions/KafkaRestConfig" + } + ], + "default": { + "url": "http://localhost:8082" + }, + "description": "Configuration for Kafka REST Proxy.", + "env_names": [ + "kpops_kafka_rest" + ], + "title": "Kafka Rest" }, "retain_clean_jobs": { "default": false, @@ -156,15 +208,21 @@ "title": "Retain Clean Jobs", "type": "boolean" }, - "schema_registry_url": { - "description": "Address of the Schema Registry.", - "env": "KPOPS_SCHEMA_REGISTRY_URL", + "schema_registry": { + "allOf": [ + { + "$ref": "#/definitions/SchemaRegistryConfig" + } + ], + "default": { + "enabled": false, + "url": "http://localhost:8081" + }, + "description": "Configuration for Schema Registry.", "env_names": [ - "kpops_schema_registry_url" + "kpops_schema_registry" ], - "example": "http://localhost:8081", - "title": "Schema Registry Url", - "type": "string" + "title": "Schema Registry" }, "timeout": { "default": 300, @@ -188,21 +246,51 @@ }, "description": "Configure the topic name variables you can use in the pipeline definition.", "env_names": [ - "topic_name_config" + "kpops_topic_name_config" ], "title": "Topic Name Config" } }, "required": [ "environment", - "brokers" + "kafka_brokers" ], - "title": "PipelineConfig", + "title": "KpopsConfig", + "type": "object" + }, + "SchemaRegistryConfig": { + "additionalProperties": false, + "description": "Configuration for Schema Registry.", + "properties": { + "enabled": { + "default": false, + "description": "Whether the Schema Registry handler should be initialized.", + "env_names": [ + "enabled" + ], + "title": "Enabled", + "type": "boolean" + }, + "url": { + "default": "http://localhost:8081", + "description": "Address of the Schema Registry.", + "env": "KPOPS_SCHEMA_REGISTRY_URL", + "env_names": [ + "kpops_schema_registry_url" + ], + "format": "uri", + "maxLength": 65536, + "minLength": 1, + "title": "Url", + "type": "string" + } + }, + "title": "SchemaRegistryConfig", "type": "object" }, "TopicNameConfig": { "additionalProperties": false, - "description": "Configures topic names.", + "description": "Configure the topic name variables you can use in the pipeline definition.", "properties": { "default_error_topic_name": { "default": "${pipeline_name}-${component_name}-error", diff --git a/docs/docs/user/migration-guide/v2-v3.md b/docs/docs/user/migration-guide/v2-v3.md index e0d669859..def10c0f0 100644 --- a/docs/docs/user/migration-guide/v2-v3.md +++ b/docs/docs/user/migration-guide/v2-v3.md @@ -1,8 +1,56 @@ # Migrate from V2 to V3 +## [Make Kafka REST Proxy & Kafka Connect hosts default and improve Schema Registry config](https://github.com/bakdata/kpops/pull/354) + +The breaking changes target the `config.yaml` file: + +- The `schema_registry_url` is replaced with `schema_registry.url` (default `http://localhost:8081`) and `schema_registry.enabled` (default `false`). + +- `kafka_rest_host` is renamed to `kafka_rest.url` (default `http://localhost:8082`). + +- `kafka_connect_host` is replaced with `kafka_connect.url` (default `http://localhost:8083`). + +- `brokers` is renamed to `kafka_brokers`. + +The environment variable names of these config fields changed respectively. Please refer to the [environment variables documentation page](../core-concepts/variables/environment_variables.md) to see the newest changes. + +#### config.yaml + +```diff + environment: development +- brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" +- kafka_rest_host: "http://my-custom-rest.url:8082" +- kafka_connect_host: "http://my-custom-connect.url:8083" +- schema_registry_url: "http://my-custom-sr.url:8081" ++ kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" ++ kafka_rest: ++ url: "http://my-custom-rest.url:8082" ++ kafka_connect: ++ url: "http://my-custom-connect.url:8083" ++ schema_registry: ++ enabled: true ++ url: "http://my-custom-sr.url:8081" +``` + +#### pipeline.yaml and default.yaml + +The variable is now called `kafka_brokers`. + +```diff +... + app: + streams: +- brokers: ${brokers} ++ brokers: ${kafka_brokers} + schemaRegistryUrl: ${schema_registry_url} + nameOverride: override-with-this-name + imageTag: "1.0.0" +... +``` + ## [Move GitHub action to repsitory root](https://github.com/bakdata/kpops/pull/356) -The location of the GitHub action has changed and it's now available directly as `bakdata/kpops`. +The location of the GitHub action has changed, and it's now available directly as `bakdata/kpops`. You'll need to change it in your GitHub CI workflows. diff --git a/examples/bakdata/atm-fraud-detection/config.yaml b/examples/bakdata/atm-fraud-detection/config.yaml index e3742ded9..d03a12c64 100644 --- a/examples/bakdata/atm-fraud-detection/config.yaml +++ b/examples/bakdata/atm-fraud-detection/config.yaml @@ -4,12 +4,16 @@ topic_name_config: default_error_topic_name: "${pipeline_name}-${component_name}-dead-letter-topic" default_output_topic_name: "${pipeline_name}-${component_name}-topic" -brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" +kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" -schema_registry_url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081" +schema_registry: + enabled: true + url: "http://k8kafka-cp-schema-registry.kpops.svc.cluster.local:8081" -kafka_rest_host: "http://localhost:8082" +kafka_rest: + url: "http://localhost:8082" -kafka_connect_host: "http://localhost:8083" +kafka_connect: + url: "http://localhost:8083" defaults_path: . diff --git a/examples/bakdata/atm-fraud-detection/defaults.yaml b/examples/bakdata/atm-fraud-detection/defaults.yaml index 609933f13..e3ba49c67 100644 --- a/examples/bakdata/atm-fraud-detection/defaults.yaml +++ b/examples/bakdata/atm-fraud-detection/defaults.yaml @@ -10,7 +10,7 @@ kafka-connector: kafka-app: app: streams: - brokers: ${brokers} + brokers: ${kafka_brokers} schemaRegistryUrl: ${schema_registry_url} optimizeLeaveGroupBehavior: false diff --git a/hooks/gen_docs/gen_docs_env_vars.py b/hooks/gen_docs/gen_docs_env_vars.py index ac88b82b6..24106e18f 100644 --- a/hooks/gen_docs/gen_docs_env_vars.py +++ b/hooks/gen_docs/gen_docs_env_vars.py @@ -21,7 +21,7 @@ from hooks import PATH_ROOT from hooks.gen_docs import IterableStrEnum from kpops.cli import main -from kpops.cli.pipeline_config import PipelineConfig +from kpops.config import KpopsConfig PATH_DOCS_RESOURCES = PATH_ROOT / "docs/docs/resources" PATH_DOCS_VARIABLES = PATH_DOCS_RESOURCES / "variables" @@ -254,8 +254,8 @@ def fill_csv_pipeline_config(target: Path) -> None: :param target: The path to the `.csv` file. Note that it must already contain the column names """ - for field in collect_fields(PipelineConfig): - field_info = PipelineConfig.Config.get_field_info(field.name) + for field in collect_fields(KpopsConfig): + field_info = KpopsConfig.Config.get_field_info(field.name) field_description: str = ( field.field_info.description or "No description available, please refer to the pipeline config documentation." diff --git a/kpops/cli/main.py b/kpops/cli/main.py index f58808cd2..f689231af 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -10,7 +10,6 @@ from kpops import __version__ from kpops.cli.custom_formatter import CustomFormatter -from kpops.cli.pipeline_config import ENV_PREFIX, PipelineConfig from kpops.cli.registry import Registry from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( @@ -19,6 +18,7 @@ from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper +from kpops.config import ENV_PREFIX, KpopsConfig from kpops.pipeline_generator.pipeline import Pipeline from kpops.utils.gen_schema import SchemaScope, gen_config_schema, gen_pipeline_schema @@ -111,25 +111,25 @@ def setup_pipeline( pipeline_base_dir: Path, pipeline_path: Path, components_module: str | None, - pipeline_config: PipelineConfig, + kpops_config: KpopsConfig, ) -> Pipeline: registry = Registry() if components_module: registry.find_components(components_module) registry.find_components("kpops.components") - handlers = setup_handlers(components_module, pipeline_config) + handlers = setup_handlers(components_module, kpops_config) return Pipeline.load_from_yaml( - pipeline_base_dir, pipeline_path, registry, pipeline_config, handlers + pipeline_base_dir, pipeline_path, registry, kpops_config, handlers ) def setup_handlers( - components_module: str | None, config: PipelineConfig + components_module: str | None, config: KpopsConfig ) -> ComponentHandlers: schema_handler = SchemaHandler.load_schema_handler(components_module, config) - connector_handler = KafkaConnectHandler.from_pipeline_config(config) - proxy_wrapper = ProxyWrapper(config) + connector_handler = KafkaConnectHandler.from_kpops_config(config) + proxy_wrapper = ProxyWrapper(config.kafka_rest) topic_handler = TopicHandler(proxy_wrapper) return ComponentHandlers(schema_handler, connector_handler, topic_handler) @@ -191,17 +191,17 @@ def log_action(action: str, pipeline_component: PipelineComponent): log.info("\n") -def create_pipeline_config( +def create_kpops_config( config: Path, defaults: Optional[Path], verbose: bool -) -> PipelineConfig: +) -> KpopsConfig: setup_logging_level(verbose) - PipelineConfig.Config.config_path = config + KpopsConfig.Config.config_path = config if defaults: - pipeline_config = PipelineConfig(defaults_path=defaults) + kpops_config = KpopsConfig(defaults_path=defaults) else: - pipeline_config = PipelineConfig() - pipeline_config.defaults_path = config.parent / pipeline_config.defaults_path - return pipeline_config + kpops_config = KpopsConfig() + kpops_config.defaults_path = config.parent / kpops_config.defaults_path + return kpops_config @app.command( # pyright: ignore[reportGeneralTypeIssues] https://github.com/rec/dtyper/issues/8 @@ -248,9 +248,9 @@ def generate( filter_type: FilterType = FILTER_TYPE, verbose: bool = VERBOSE_OPTION, ) -> Pipeline: - pipeline_config = create_pipeline_config(config, defaults, verbose) + kpops_config = create_kpops_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, kpops_config ) if not template: @@ -283,9 +283,9 @@ def deploy( dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, ): - pipeline_config = create_pipeline_config(config, defaults, verbose) + kpops_config = create_kpops_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, kpops_config ) steps_to_apply = get_steps_to_apply(pipeline, steps, filter_type) @@ -308,9 +308,9 @@ def destroy( dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, ): - pipeline_config = create_pipeline_config(config, defaults, verbose) + kpops_config = create_kpops_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, kpops_config ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: @@ -332,9 +332,9 @@ def reset( dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, ): - pipeline_config = create_pipeline_config(config, defaults, verbose) + kpops_config = create_kpops_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, kpops_config ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: @@ -357,9 +357,9 @@ def clean( dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, ): - pipeline_config = create_pipeline_config(config, defaults, verbose) + kpops_config = create_kpops_config(config, defaults, verbose) pipeline = setup_pipeline( - pipeline_base_dir, pipeline_path, components_module, pipeline_config + pipeline_base_dir, pipeline_path, components_module, kpops_config ) pipeline_steps = reverse_pipeline_steps(pipeline, steps, filter_type) for component in pipeline_steps: diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index 9a3dd307e..aa1918a43 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -1,8 +1,12 @@ +from __future__ import annotations + import logging import time from time import sleep +from typing import TYPE_CHECKING import httpx +from pydantic import AnyHttpUrl from kpops.component_handlers.kafka_connect.exception import ( ConnectorNotFoundException, @@ -14,6 +18,9 @@ KafkaConnectResponse, ) +if TYPE_CHECKING: + from kpops.config import KafkaConnectConfig + HEADERS = {"Accept": "application/json", "Content-Type": "application/json"} log = logging.getLogger("KafkaConnectAPI") @@ -24,18 +31,12 @@ class ConnectWrapper: Wraps Kafka Connect APIs """ - def __init__(self, host: str | None): - if not host: - error_message = ( - "The Kafka Connect host is not set. Please set the host in the config." - ) - log.error(error_message) - raise RuntimeError(error_message) - self._host: str = host + def __init__(self, config: KafkaConnectConfig) -> None: + self._config: KafkaConnectConfig = config @property - def host(self) -> str: - return self._host + def url(self) -> AnyHttpUrl: + return self._config.url def create_connector( self, connector_config: KafkaConnectorConfig @@ -49,7 +50,7 @@ def create_connector( config_json = connector_config.dict() connect_data = {"name": connector_config.name, "config": config_json} response = httpx.post( - url=f"{self._host}/connectors", headers=HEADERS, json=connect_data + url=f"{self.url}/connectors", headers=HEADERS, json=connect_data ) if response.status_code == httpx.codes.CREATED: log.info(f"Connector {connector_config.name} created.") @@ -71,7 +72,7 @@ def get_connector(self, connector_name: str) -> KafkaConnectResponse: :return: Information about the connector """ response = httpx.get( - url=f"{self._host}/connectors/{connector_name}", headers=HEADERS + url=f"{self.url}/connectors/{connector_name}", headers=HEADERS ) if response.status_code == httpx.codes.OK: log.info(f"Connector {connector_name} exists.") @@ -99,7 +100,7 @@ def update_connector_config( connector_name = connector_config.name config_json = connector_config.dict() response = httpx.put( - url=f"{self._host}/connectors/{connector_name}/config", + url=f"{self.url}/connectors/{connector_name}/config", headers=HEADERS, json=config_json, ) @@ -129,7 +130,7 @@ def validate_connector_config( :return: """ response = httpx.put( - url=f"{self._host}/connector-plugins/{connector_config.class_name}/config/validate", + url=f"{self.url}/connector-plugins/{connector_config.class_name}/config/validate", headers=HEADERS, json=connector_config.dict(), ) @@ -156,7 +157,7 @@ def delete_connector(self, connector_name: str) -> None: API Reference:https://docs.confluent.io/platform/current/connect/references/restapi.html#delete--connectors-(string-name)- """ response = httpx.delete( - url=f"{self._host}/connectors/{connector_name}", headers=HEADERS + url=f"{self.url}/connectors/{connector_name}", headers=HEADERS ) if response.status_code == httpx.codes.NO_CONTENT: log.info(f"Connector {connector_name} deleted.") diff --git a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py index 14f5af076..7e3d798fe 100644 --- a/kpops/component_handlers/kafka_connect/kafka_connect_handler.py +++ b/kpops/component_handlers/kafka_connect/kafka_connect_handler.py @@ -19,7 +19,7 @@ from typing_extensions import Self if TYPE_CHECKING: - from kpops.cli.pipeline_config import PipelineConfig + from kpops.config import KpopsConfig log = logging.getLogger("KafkaConnectHandler") @@ -100,14 +100,14 @@ def __dry_run_connector_creation( log.debug(connector_config.dict()) log.debug(f"PUT /connectors/{connector_name}/config HTTP/1.1") - log.debug(f"HOST: {self._connect_wrapper.host}") + log.debug(f"HOST: {self._connect_wrapper.url}") except ConnectorNotFoundException: diff = render_diff({}, connector_config.dict()) log.info( f"Connector Creation: connector {connector_name} does not exist. Creating connector with config:\n{diff}" ) log.debug("POST /connectors HTTP/1.1") - log.debug(f"HOST: {self._connect_wrapper.host}") + log.debug(f"HOST: {self._connect_wrapper.url}") errors = self._connect_wrapper.validate_connector_config(connector_config) if len(errors) > 0: @@ -129,15 +129,15 @@ def __dry_run_connector_deletion(self, connector_name: str) -> None: ) ) log.debug(f"DELETE /connectors/{connector_name} HTTP/1.1") - log.debug(f"HOST: {self._connect_wrapper.host}") + log.debug(f"HOST: {self._connect_wrapper.url}") except ConnectorNotFoundException: log.warning( f"Connector Destruction: connector {connector_name} does not exist and cannot be deleted. Skipping." ) @classmethod - def from_pipeline_config(cls, pipeline_config: PipelineConfig) -> Self: + def from_kpops_config(cls, config: KpopsConfig) -> Self: return cls( - connect_wrapper=ConnectWrapper(host=pipeline_config.kafka_connect_host), - timeout=pipeline_config.timeout, + connect_wrapper=ConnectWrapper(config.kafka_connect), + timeout=config.timeout, ) diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index a053ccc62..4b21083de 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -8,21 +8,27 @@ from schema_registry.client.schema import AvroSchema from kpops.cli.exception import ClassNotFoundError -from kpops.cli.pipeline_config import PipelineConfig from kpops.cli.registry import find_class from kpops.component_handlers.schema_handler.schema_provider import ( Schema, SchemaProvider, ) from kpops.components.base_components.models.to_section import ToSection +from kpops.config import KpopsConfig from kpops.utils.colorify import greenify, magentaify log = logging.getLogger("SchemaHandler") class SchemaHandler: - def __init__(self, url: str, components_module: str | None): - self.schema_registry_client = SchemaRegistryClient(url) + def __init__( + self, + kpops_config: KpopsConfig, + components_module: str | None, + ) -> None: + self.schema_registry_client = SchemaRegistryClient( + kpops_config.schema_registry.url + ) self.components_module = components_module @cached_property @@ -42,15 +48,11 @@ def schema_provider(self) -> SchemaProvider: @classmethod def load_schema_handler( - cls, components_module: str | None, config: PipelineConfig + cls, components_module: str | None, config: KpopsConfig ) -> SchemaHandler | None: - if not config.schema_registry_url: - return None - - return cls( - url=config.schema_registry_url, - components_module=components_module, - ) + if config.schema_registry.enabled: + return cls(config, components_module) + return None def submit_schemas(self, to_section: ToSection, dry_run: bool = True) -> None: for topic_name, config in to_section.topics.items(): diff --git a/kpops/component_handlers/topic/handler.py b/kpops/component_handlers/topic/handler.py index 1df0d106a..cef544ab9 100644 --- a/kpops/component_handlers/topic/handler.py +++ b/kpops/component_handlers/topic/handler.py @@ -129,7 +129,7 @@ def __dry_run_topic_creation( ) ) log.debug(f"POST /clusters/{self.proxy_wrapper.cluster_id}/topics HTTP/1.1") - log.debug(f"Host: {self.proxy_wrapper.host}") + log.debug(f"Host: {self.proxy_wrapper.url}") log.debug(HEADERS) log.debug(topic_spec.dict()) @@ -187,7 +187,7 @@ def __dry_run_topic_deletion(self, topic_name: str) -> None: log.warning( f"Topic Deletion: topic {topic_name} does not exist in the cluster and cannot be deleted. Skipping." ) - log.debug(f"Host: {self.proxy_wrapper.host}") + log.debug(f"Host: {self.proxy_wrapper.url}") log.debug(HEADERS) log.debug("HTTP/1.1 404 Not Found") log.debug(HEADERS) diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index af7914379..407dcfcd8 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -1,9 +1,12 @@ +from __future__ import annotations + import logging from functools import cached_property +from typing import TYPE_CHECKING import httpx +from pydantic import AnyHttpUrl -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers.topic.exception import ( KafkaRestProxyError, TopicNotFoundException, @@ -15,6 +18,9 @@ TopicSpec, ) +if TYPE_CHECKING: + from kpops.config import KafkaRestConfig + log = logging.getLogger("KafkaRestProxy") HEADERS = {"Content-Type": "application/json"} @@ -25,13 +31,8 @@ class ProxyWrapper: Wraps Kafka REST Proxy APIs """ - def __init__(self, pipeline_config: PipelineConfig) -> None: - if not pipeline_config.kafka_rest_host: - raise ValueError( - "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." - ) - - self._host = pipeline_config.kafka_rest_host + def __init__(self, config: KafkaRestConfig) -> None: + self._config: KafkaRestConfig = config @cached_property def cluster_id(self) -> str: @@ -44,7 +45,7 @@ def cluster_id(self) -> str: bootstrap.servers configuration. Therefore, only one Kafka cluster will be returned. :return: The Kafka cluster ID. """ - response = httpx.get(url=f"{self._host}/v3/clusters") + response = httpx.get(url=f"{self._config.url}/v3/clusters") if response.status_code == httpx.codes.OK: cluster_information = response.json() return cluster_information["data"][0]["cluster_id"] @@ -52,8 +53,8 @@ def cluster_id(self) -> str: raise KafkaRestProxyError(response) @property - def host(self) -> str: - return self._host + def url(self) -> AnyHttpUrl: + return self._config.url def create_topic(self, topic_spec: TopicSpec) -> None: """ @@ -62,7 +63,7 @@ def create_topic(self, topic_spec: TopicSpec) -> None: :param topic_spec: The topic specification. """ response = httpx.post( - url=f"{self._host}/v3/clusters/{self.cluster_id}/topics", + url=f"{self.url}/v3/clusters/{self.cluster_id}/topics", headers=HEADERS, json=topic_spec.dict(exclude_none=True), ) @@ -80,7 +81,7 @@ def delete_topic(self, topic_name: str) -> None: :param topic_name: Name of the topic """ response = httpx.delete( - url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}", + url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}", headers=HEADERS, ) if response.status_code == httpx.codes.NO_CONTENT: @@ -97,7 +98,7 @@ def get_topic(self, topic_name: str) -> TopicResponse: :return: Response of the get topic API """ response = httpx.get( - url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}", + url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}", headers=HEADERS, ) if response.status_code == httpx.codes.OK: @@ -123,7 +124,7 @@ def get_topic_config(self, topic_name: str) -> TopicConfigResponse: :return: The topic configuration. """ response = httpx.get( - url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs", + url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs", headers=HEADERS, ) @@ -150,7 +151,7 @@ def batch_alter_topic_config(self, topic_name: str, json_body: list[dict]) -> No :param config_name: The configuration parameter name. """ response = httpx.post( - url=f"{self.host}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter", + url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter", headers=HEADERS, json={"data": json_body}, ) @@ -167,7 +168,7 @@ def get_broker_config(self) -> BrokerConfigResponse: :return: The broker configuration. """ response = httpx.get( - url=f"{self.host}/v3/clusters/{self.cluster_id}/brokers/-/configs", + url=f"{self.url}/v3/clusters/{self.cluster_id}/brokers/-/configs", headers=HEADERS, ) diff --git a/kpops/components/base_components/base_defaults_component.py b/kpops/components/base_components/base_defaults_component.py index 99dec42f2..545813f53 100644 --- a/kpops/components/base_components/base_defaults_component.py +++ b/kpops/components/base_components/base_defaults_component.py @@ -9,8 +9,8 @@ import typer from pydantic import BaseModel, Field -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers +from kpops.config import KpopsConfig from kpops.utils import cached_classproperty from kpops.utils.dict_ops import update_nested from kpops.utils.docstring import describe_attr @@ -45,7 +45,7 @@ class BaseDefaultsComponent(BaseModel): exclude=True, hidden_from_schema=True, ) - config: PipelineConfig = Field( + config: KpopsConfig = Field( default=..., description=describe_attr("config", __doc__), exclude=True, @@ -90,7 +90,7 @@ def extend_with_defaults(self, **kwargs) -> dict: :param kwargs: The init kwargs for pydantic :returns: Enriched kwargs with inheritted defaults """ - config: PipelineConfig = kwargs["config"] + config: KpopsConfig = kwargs["config"] log.debug( typer.style( "Enriching component of type ", fg=typer.colors.GREEN, bold=False @@ -177,7 +177,7 @@ def defaults_from_yaml(path: Path, key: str) -> dict: return value -def get_defaults_file_paths(config: PipelineConfig) -> tuple[Path, Path]: +def get_defaults_file_paths(config: KpopsConfig) -> tuple[Path, Path]: """Return the paths to the main and the environment defaults-files The files need not exist, this function will only check if the dir set in diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index e53886d68..bad11b6fc 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -241,7 +241,7 @@ def _get_kafka_connect_resetter_values( **KafkaConnectResetterValues( config=KafkaConnectResetterConfig( connector=self.full_name, - brokers=self.config.brokers, + brokers=self.config.kafka_brokers, **kwargs, ), connector_type=self._connector_type.value, diff --git a/kpops/cli/pipeline_config.py b/kpops/config.py similarity index 67% rename from kpops/cli/pipeline_config.py rename to kpops/config.py index 1400323f5..95193bd53 100644 --- a/kpops/cli/pipeline_config.py +++ b/kpops/config.py @@ -3,9 +3,10 @@ from pathlib import Path from typing import TYPE_CHECKING, Any -from pydantic import BaseConfig, BaseSettings, Field +from pydantic import AnyHttpUrl, BaseConfig, BaseSettings, Field, parse_obj_as from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig +from kpops.utils.docstring import describe_object from kpops.utils.yaml_loading import load_yaml_file if TYPE_CHECKING: @@ -17,7 +18,7 @@ class TopicNameConfig(BaseSettings): - """Configures topic names.""" + """Configure the topic name variables you can use in the pipeline definition.""" default_output_topic_name: str = Field( default="${pipeline_name}-${component_name}", @@ -29,7 +30,43 @@ class TopicNameConfig(BaseSettings): ) -class PipelineConfig(BaseSettings): +class SchemaRegistryConfig(BaseSettings): + """Configuration for Schema Registry.""" + + enabled: bool = Field( + default=False, + description="Whether the Schema Registry handler should be initialized.", + ) + url: AnyHttpUrl = Field( + # For validating URLs use parse_obj_as + # https://github.com/pydantic/pydantic/issues/1106 + default=parse_obj_as(AnyHttpUrl, "http://localhost:8081"), + env=f"{ENV_PREFIX}SCHEMA_REGISTRY_URL", + description="Address of the Schema Registry.", + ) + + +class KafkaRestConfig(BaseSettings): + """Configuration for Kafka REST Proxy.""" + + url: AnyHttpUrl = Field( + default=parse_obj_as(AnyHttpUrl, "http://localhost:8082"), + env=f"{ENV_PREFIX}KAFKA_REST_URL", + description="Address of the Kafka REST Proxy.", + ) + + +class KafkaConnectConfig(BaseSettings): + """Configuration for Kafka Connect.""" + + url: AnyHttpUrl = Field( + default=parse_obj_as(AnyHttpUrl, "http://localhost:8083"), + env=f"{ENV_PREFIX}KAFKA_CONNECT_URL", + description="Address of Kafka Connect.", + ) + + +class KpopsConfig(BaseSettings): """Pipeline configuration unrelated to the components.""" defaults_path: Path = Field( @@ -45,7 +82,7 @@ class PipelineConfig(BaseSettings): description="The environment you want to generate and deploy the pipeline to. " "Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).", ) - brokers: str = Field( + kafka_brokers: str = Field( default=..., env=f"{ENV_PREFIX}KAFKA_BROKERS", description="The comma separated Kafka brokers address.", @@ -57,25 +94,19 @@ class PipelineConfig(BaseSettings): ) topic_name_config: TopicNameConfig = Field( default=TopicNameConfig(), - description="Configure the topic name variables you can use in the pipeline definition.", + description=describe_object(TopicNameConfig.__doc__), ) - schema_registry_url: str | None = Field( - default=None, - example="http://localhost:8081", - env=f"{ENV_PREFIX}SCHEMA_REGISTRY_URL", - description="Address of the Schema Registry.", + schema_registry: SchemaRegistryConfig = Field( + default=SchemaRegistryConfig(), + description=describe_object(SchemaRegistryConfig.__doc__), ) - kafka_rest_host: str | None = Field( - default=None, - env=f"{ENV_PREFIX}REST_PROXY_HOST", - example="http://localhost:8082", - description="Address of the Kafka REST Proxy.", + kafka_rest: KafkaRestConfig = Field( + default=KafkaRestConfig(), + description=describe_object(KafkaRestConfig.__doc__), ) - kafka_connect_host: str | None = Field( - default=None, - env=f"{ENV_PREFIX}CONNECT_HOST", - example="http://localhost:8083", - description="Address of Kafka Connect.", + kafka_connect: KafkaConnectConfig = Field( + default=KafkaConnectConfig(), + description=describe_object(KafkaConnectConfig.__doc__), ) timeout: int = Field( default=300, @@ -104,6 +135,7 @@ class Config(BaseConfig): config_path = Path("config.yaml") env_file = ".env" env_file_encoding = "utf-8" + env_prefix = ENV_PREFIX @classmethod def customise_sources( @@ -112,7 +144,7 @@ def customise_sources( env_settings: SettingsSourceCallable, file_secret_settings: SettingsSourceCallable, ) -> tuple[ - SettingsSourceCallable | Callable[[PipelineConfig], dict[str, Any]], ... + SettingsSourceCallable | Callable[[KpopsConfig], dict[str, Any]], ... ]: return ( env_settings, @@ -122,7 +154,7 @@ def customise_sources( ) -def yaml_config_settings_source(settings: PipelineConfig) -> dict[str, Any]: +def yaml_config_settings_source(settings: KpopsConfig) -> dict[str, Any]: path_to_config = settings.Config.config_path if path_to_config.exists(): if isinstance(source := load_yaml_file(path_to_config), dict): diff --git a/kpops/pipeline_generator/pipeline.py b/kpops/pipeline_generator/pipeline.py index 093a452ea..ce5b698cc 100644 --- a/kpops/pipeline_generator/pipeline.py +++ b/kpops/pipeline_generator/pipeline.py @@ -12,10 +12,10 @@ from rich.console import Console from rich.syntax import Syntax -from kpops.cli.pipeline_config import PipelineConfig from kpops.cli.registry import Registry from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.config import KpopsConfig from kpops.utils.dict_ops import generate_substitution, update_nested_pair from kpops.utils.environment import ENV from kpops.utils.yaml_loading import load_yaml_file, substitute, substitute_nested @@ -100,7 +100,7 @@ def __init__( component_list: list[dict], environment_components: list[dict], registry: Registry, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, ) -> None: self.components: PipelineComponents = PipelineComponents() @@ -117,7 +117,7 @@ def load_from_yaml( base_dir: Path, path: Path, registry: Registry, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, ) -> Pipeline: """Load pipeline definition from yaml @@ -310,7 +310,7 @@ def validate(self) -> None: self.components.validate_unique_names() @staticmethod - def pipeline_filename_environment(path: Path, config: PipelineConfig) -> Path: + def pipeline_filename_environment(path: Path, config: KpopsConfig) -> Path: """Add the environment name from the PipelineConfig to the pipeline.yaml path :param path: Path to pipeline.yaml file diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index 470a1412d..571a82a7d 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -8,9 +8,9 @@ from pydantic.fields import FieldInfo, ModelField from pydantic.schema import SkipField -from kpops.cli.pipeline_config import PipelineConfig from kpops.cli.registry import _find_classes from kpops.components.base_components.pipeline_component import PipelineComponent +from kpops.config import KpopsConfig from kpops.utils.docstring import describe_object @@ -139,6 +139,6 @@ def gen_pipeline_schema( def gen_config_schema() -> None: """Generate a json schema from the model of pipeline config""" schema = schema_json_of( - PipelineConfig, title="KPOps config schema", indent=4, sort_keys=True + KpopsConfig, title="KPOps config schema", indent=4, sort_keys=True ) print(schema) diff --git a/tests/cli/test_handlers.py b/tests/cli/test_handlers.py index 509c5e0cc..40c496497 100644 --- a/tests/cli/test_handlers.py +++ b/tests/cli/test_handlers.py @@ -3,28 +3,27 @@ from pytest_mock import MockerFixture from kpops.cli.main import setup_handlers -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.kafka_connect.kafka_connect_handler import ( KafkaConnectHandler, ) from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.topic.handler import TopicHandler +from kpops.config import KpopsConfig, SchemaRegistryConfig from tests.cli.resources.module import CustomSchemaProvider MODULE = CustomSchemaProvider.__module__ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): - config = PipelineConfig( + config = KpopsConfig( defaults_path=Path("fake"), environment="development", - kafka_rest_host="https://testhost:8082", - schema_registry_url=None, + kafka_brokers="broker:9092", ) connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") - connector_handler = KafkaConnectHandler.from_pipeline_config(pipeline_config=config) - connector_handler_mock.from_pipeline_config.return_value = connector_handler + connector_handler = KafkaConnectHandler.from_kpops_config(config) + connector_handler_mock.from_kpops_config.return_value = connector_handler topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") @@ -39,7 +38,7 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): actual_handlers = setup_handlers(MODULE, config) - connector_handler_mock.from_pipeline_config.assert_called_once_with(config) + connector_handler_mock.from_kpops_config.assert_called_once_with(config) assert actual_handlers.schema_handler == expected.schema_handler assert actual_handlers.connector_handler == expected.connector_handler @@ -51,19 +50,19 @@ def test_set_up_handlers_with_no_schema_handler(mocker: MockerFixture): def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): - config = PipelineConfig( + config = KpopsConfig( defaults_path=Path("fake"), environment="development", - kafka_rest_host="https://testhost:8082", - schema_registry_url="https://testhost:8081", + schema_registry=SchemaRegistryConfig(enabled=True), + kafka_brokers="broker:9092", ) schema_handler_mock = mocker.patch("kpops.cli.main.SchemaHandler") schema_handler = SchemaHandler.load_schema_handler(MODULE, config) schema_handler_mock.load_schema_handler.return_value = schema_handler connector_handler_mock = mocker.patch("kpops.cli.main.KafkaConnectHandler") - connector_handler = KafkaConnectHandler.from_pipeline_config(pipeline_config=config) - connector_handler_mock.from_pipeline_config.return_value = connector_handler + connector_handler = KafkaConnectHandler.from_kpops_config(config) + connector_handler_mock.from_kpops_config.return_value = connector_handler topic_handler_mock = mocker.patch("kpops.cli.main.TopicHandler") wrapper = mocker.patch("kpops.cli.main.ProxyWrapper") @@ -80,7 +79,7 @@ def test_set_up_handlers_with_schema_handler(mocker: MockerFixture): schema_handler_mock.load_schema_handler.assert_called_once_with(MODULE, config) - connector_handler_mock.from_pipeline_config.assert_called_once_with(config) + connector_handler_mock.from_kpops_config.assert_called_once_with(config) assert actual_handlers.schema_handler == expected.schema_handler assert actual_handlers.connector_handler == expected.connector_handler diff --git a/tests/cli/test_kpops_config.py b/tests/cli/test_kpops_config.py new file mode 100644 index 000000000..254a2d73a --- /dev/null +++ b/tests/cli/test_kpops_config.py @@ -0,0 +1,67 @@ +from pathlib import Path + +import pytest +from pydantic import AnyHttpUrl, ValidationError, parse_obj_as + +from kpops.config import ( + KafkaConnectConfig, + KafkaRestConfig, + KpopsConfig, + SchemaRegistryConfig, +) + + +def test_kpops_config_with_default_values(): + default_config = KpopsConfig( + environment="development", kafka_brokers="http://broker:9092" + ) + + assert default_config.defaults_path == Path(".") + assert default_config.defaults_filename_prefix == "defaults" + assert ( + default_config.topic_name_config.default_output_topic_name + == "${pipeline_name}-${component_name}" + ) + assert ( + default_config.topic_name_config.default_error_topic_name + == "${pipeline_name}-${component_name}-error" + ) + assert default_config.schema_registry.enabled is False + assert default_config.schema_registry.url == "http://localhost:8081" + assert default_config.kafka_rest.url == "http://localhost:8082" + assert default_config.kafka_connect.url == "http://localhost:8083" + assert default_config.timeout == 300 + assert default_config.create_namespace is False + assert default_config.helm_config.context is None + assert default_config.helm_config.debug is False + assert default_config.helm_config.api_version is None + assert default_config.helm_diff_config.ignore == set() + assert default_config.retain_clean_jobs is False + + +def test_kpops_config_with_different_invalid_urls(): + with pytest.raises(ValidationError): + KpopsConfig( + environment="development", + kafka_brokers="http://broker:9092", + kafka_connect=KafkaConnectConfig( + url=parse_obj_as(AnyHttpUrl, "invalid-host") + ), + ) + + with pytest.raises(ValidationError): + KpopsConfig( + environment="development", + kafka_brokers="http://broker:9092", + kafka_rest=KafkaRestConfig(url=parse_obj_as(AnyHttpUrl, "invalid-host")), + ) + + with pytest.raises(ValidationError): + KpopsConfig( + environment="development", + kafka_brokers="http://broker:9092", + schema_registry=SchemaRegistryConfig( + enabled=True, + url=parse_obj_as(AnyHttpUrl, "invalid-host"), + ), + ) diff --git a/tests/compiler/test_pipeline_name.py b/tests/compiler/test_pipeline_name.py index 7a07c1a12..9a44412dd 100644 --- a/tests/compiler/test_pipeline_name.py +++ b/tests/compiler/test_pipeline_name.py @@ -2,7 +2,7 @@ import pytest -from kpops.cli.pipeline_config import PipelineConfig +from kpops.config import KpopsConfig from kpops.pipeline_generator.pipeline import Pipeline from kpops.utils.environment import ENV @@ -55,7 +55,7 @@ def test_should_not_set_pipeline_name_with_the_same_base_dir(): def test_pipeline_file_name_environment(): - config = PipelineConfig( + config = KpopsConfig( defaults_path=DEFAULTS_PATH, environment="some_environment", ) diff --git a/tests/component_handlers/kafka_connect/test_connect_wrapper.py b/tests/component_handlers/kafka_connect/test_connect_wrapper.py index 3db9c090f..ca9d53313 100644 --- a/tests/component_handlers/kafka_connect/test_connect_wrapper.py +++ b/tests/component_handlers/kafka_connect/test_connect_wrapper.py @@ -6,7 +6,6 @@ import pytest from pytest_httpx import HTTPXMock -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers.kafka_connect.connect_wrapper import ConnectWrapper from kpops.component_handlers.kafka_connect.exception import ( ConnectorNotFoundException, @@ -17,22 +16,22 @@ KafkaConnectResponse, ) from kpops.component_handlers.kafka_connect.timeout import timeout +from kpops.config import KpopsConfig HEADERS = {"Accept": "application/json", "Content-Type": "application/json"} -HOST = "http://localhost:8083" +DEFAULT_HOST = "http://localhost:8083" DEFAULTS_PATH = Path(__file__).parent / "resources" class TestConnectorApiWrapper: @pytest.fixture(autouse=True) def setup(self): - config = PipelineConfig( + config = KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", - kafka_connect_host=HOST, ) - self.connect_wrapper = ConnectWrapper(host=config.kafka_connect_host) + self.connect_wrapper = ConnectWrapper(config.kafka_connect) @pytest.fixture def connector_config(self) -> KafkaConnectorConfig: @@ -43,19 +42,6 @@ def connector_config(self) -> KafkaConnectorConfig: } ) - def test_should_through_exception_when_host_is_not_set(self): - config = PipelineConfig( - defaults_path=DEFAULTS_PATH, - environment="development", - kafka_connect_host=None, - ) - with pytest.raises(RuntimeError) as run_time_error: - ConnectWrapper(host=config.kafka_connect_host) - assert ( - str(run_time_error.value) - == "The Kafka Connect host is not set. Please set the host in the config." - ) - @patch("httpx.post") def test_should_create_post_requests_for_given_connector_configuration( self, mock_post: MagicMock @@ -75,7 +61,7 @@ def test_should_create_post_requests_for_given_connector_configuration( self.connect_wrapper.create_connector(KafkaConnectorConfig(**configs)) mock_post.assert_called_with( - url=f"{HOST}/connectors", + url=f"{DEFAULT_HOST}/connectors", headers=HEADERS, json={ "name": "test-connector", @@ -107,7 +93,7 @@ def test_should_return_correct_response_when_connector_created( } httpx_mock.add_response( method="POST", - url=f"{HOST}/connectors", + url=f"{DEFAULT_HOST}/connectors", headers=HEADERS, json=actual_response, status_code=201, @@ -124,7 +110,7 @@ def test_should_raise_connector_exists_exception_when_connector_exists( ): httpx_mock.add_response( method="POST", - url=f"{HOST}/connectors", + url=f"{DEFAULT_HOST}/connectors", json={}, status_code=409, ) @@ -145,7 +131,7 @@ def test_should_create_correct_get_connector_request(self, mock_get: MagicMock): self.connect_wrapper.get_connector(connector_name) mock_get.assert_called_with( - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers={"Accept": "application/json", "Content-Type": "application/json"}, ) @@ -176,7 +162,7 @@ def test_should_return_correct_response_when_getting_connector( } httpx_mock.add_response( method="GET", - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, json=actual_response, status_code=200, @@ -193,7 +179,7 @@ def test_should_raise_connector_not_found_when_getting_connector( httpx_mock.add_response( method="GET", - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, json={}, status_code=404, @@ -213,7 +199,7 @@ def test_should_raise_rebalance_in_progress_when_getting_connector( httpx_mock.add_response( method="GET", - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, json={}, status_code=409, @@ -247,7 +233,7 @@ def test_should_create_correct_update_connector_request(self, mock_put: MagicMoc ) mock_put.assert_called_with( - url=f"{HOST}/connectors/{connector_name}/config", + url=f"{DEFAULT_HOST}/connectors/{connector_name}/config", headers={"Accept": "application/json", "Content-Type": "application/json"}, json=KafkaConnectorConfig(**configs).dict(), ) @@ -281,7 +267,7 @@ def test_should_return_correct_response_when_update_connector( } httpx_mock.add_response( method="PUT", - url=f"{HOST}/connectors/{connector_name}/config", + url=f"{DEFAULT_HOST}/connectors/{connector_name}/config", headers=HEADERS, json=actual_response, status_code=200, @@ -323,7 +309,7 @@ def test_should_return_correct_response_when_update_connector_created( } httpx_mock.add_response( method="PUT", - url=f"{HOST}/connectors/{connector_name}/config", + url=f"{DEFAULT_HOST}/connectors/{connector_name}/config", headers=HEADERS, json=actual_response, status_code=201, @@ -345,7 +331,7 @@ def test_should_raise_connector_exists_exception_when_update_connector( httpx_mock.add_response( method="PUT", - url=f"{HOST}/connectors/{connector_name}/config", + url=f"{DEFAULT_HOST}/connectors/{connector_name}/config", headers=HEADERS, json={}, status_code=409, @@ -369,7 +355,7 @@ def test_should_create_correct_delete_connector_request( self.connect_wrapper.delete_connector(connector_name) mock_delete.assert_called_with( - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, ) @@ -399,7 +385,7 @@ def test_should_return_correct_response_when_deleting_connector( } httpx_mock.add_response( method="DELETE", - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, json=actual_response, status_code=204, @@ -416,7 +402,7 @@ def test_should_raise_connector_not_found_when_deleting_connector( httpx_mock.add_response( method="DELETE", - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, json={}, status_code=404, @@ -436,7 +422,7 @@ def test_should_raise_rebalance_in_progress_when_deleting_connector( httpx_mock.add_response( method="DELETE", - url=f"{HOST}/connectors/{connector_name}", + url=f"{DEFAULT_HOST}/connectors/{connector_name}", headers=HEADERS, json={}, status_code=409, @@ -467,7 +453,7 @@ def test_should_create_correct_validate_connector_config_request( self.connect_wrapper.validate_connector_config(connector_config) mock_put.assert_called_with( - url=f"{HOST}/connector-plugins/FileStreamSinkConnector/config/validate", + url=f"{DEFAULT_HOST}/connector-plugins/FileStreamSinkConnector/config/validate", headers={"Accept": "application/json", "Content-Type": "application/json"}, json=connector_config.dict(), ) @@ -489,7 +475,7 @@ def test_should_create_correct_validate_connector_config_and_name_gets_added( ) mock_put.assert_called_with( - url=f"{HOST}/connector-plugins/{connector_name}/config/validate", + url=f"{DEFAULT_HOST}/connector-plugins/{connector_name}/config/validate", headers={"Accept": "application/json", "Content-Type": "application/json"}, json=KafkaConnectorConfig(**{"name": connector_name, **configs}).dict(), ) @@ -501,7 +487,7 @@ def test_should_parse_validate_connector_config(self, httpx_mock: HTTPXMock): actual_response = json.load(f) httpx_mock.add_response( method="PUT", - url=f"{HOST}/connector-plugins/FileStreamSinkConnector/config/validate", + url=f"{DEFAULT_HOST}/connector-plugins/FileStreamSinkConnector/config/validate", headers=HEADERS, json=actual_response, status_code=200, diff --git a/tests/component_handlers/schema_handler/test_schema_handler.py b/tests/component_handlers/schema_handler/test_schema_handler.py index ccea021c6..1ead99781 100644 --- a/tests/component_handlers/schema_handler/test_schema_handler.py +++ b/tests/component_handlers/schema_handler/test_schema_handler.py @@ -1,15 +1,13 @@ import json -from pathlib import Path from unittest import mock from unittest.mock import MagicMock import pytest -from pydantic import BaseModel +from pydantic import AnyHttpUrl, BaseModel, parse_obj_as from pytest_mock import MockerFixture from schema_registry.client.schema import AvroSchema from schema_registry.client.utils import SchemaVersion -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler from kpops.component_handlers.schema_handler.schema_provider import SchemaProvider from kpops.components.base_components.models import TopicName @@ -18,6 +16,7 @@ TopicConfig, ToSection, ) +from kpops.config import KpopsConfig, SchemaRegistryConfig from kpops.utils.colorify import greenify, magentaify from tests.pipeline.test_components import TestSchemaProvider @@ -69,34 +68,39 @@ def to_section(topic_config: TopicConfig) -> ToSection: return ToSection(topics={TopicName("topic-X"): topic_config}) -def test_load_schema_handler(): - config_enable = PipelineConfig( - defaults_path=Path("fake"), +@pytest.fixture() +def kpops_config_with_sr_enabled() -> KpopsConfig: + return KpopsConfig( environment="development", - schema_registry_url="http://localhost:8081", + kafka_brokers="broker:9092", + schema_registry=SchemaRegistryConfig( + enabled=True, url=parse_obj_as(AnyHttpUrl, "http://mock:8081") + ), ) - config_disable = config_enable.copy() - config_disable.schema_registry_url = None - assert ( - SchemaHandler.load_schema_handler(TEST_SCHEMA_PROVIDER_MODULE, config_disable) - is None - ) +def test_load_schema_handler(kpops_config_with_sr_enabled: KpopsConfig): assert isinstance( - SchemaHandler.load_schema_handler(TEST_SCHEMA_PROVIDER_MODULE, config_enable), + SchemaHandler.load_schema_handler( + TEST_SCHEMA_PROVIDER_MODULE, kpops_config_with_sr_enabled + ), SchemaHandler, ) + config_disable = kpops_config_with_sr_enabled.copy() + config_disable.schema_registry = SchemaRegistryConfig(enabled=False) -def test_should_lazy_load_schema_provider(find_class_mock: MagicMock): - config_enable = PipelineConfig( - defaults_path=Path("fake"), - environment="development", - schema_registry_url="http://localhost:8081", + assert ( + SchemaHandler.load_schema_handler(TEST_SCHEMA_PROVIDER_MODULE, config_disable) + is None ) + + +def test_should_lazy_load_schema_provider( + find_class_mock: MagicMock, kpops_config_with_sr_enabled: KpopsConfig +): schema_handler = SchemaHandler.load_schema_handler( - TEST_SCHEMA_PROVIDER_MODULE, config_enable + TEST_SCHEMA_PROVIDER_MODULE, kpops_config_with_sr_enabled ) assert schema_handler is not None @@ -111,9 +115,12 @@ def test_should_lazy_load_schema_provider(find_class_mock: MagicMock): find_class_mock.assert_called_once_with(TEST_SCHEMA_PROVIDER_MODULE, SchemaProvider) -def test_should_raise_value_error_if_schema_provider_class_not_found(): +def test_should_raise_value_error_if_schema_provider_class_not_found( + kpops_config_with_sr_enabled: KpopsConfig, +): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=NON_EXISTING_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=NON_EXISTING_PROVIDER_MODULE, ) with pytest.raises(ValueError) as value_error: @@ -129,22 +136,22 @@ def test_should_raise_value_error_if_schema_provider_class_not_found(): ) -def test_should_raise_value_error_when_schema_provider_is_called_and_components_module_is_empty(): - config_enable = PipelineConfig( - defaults_path=Path("fake"), - environment="development", - schema_registry_url="http://localhost:8081", - ) - +def test_should_raise_value_error_when_schema_provider_is_called_and_components_module_is_empty( + kpops_config_with_sr_enabled: KpopsConfig, +): with pytest.raises(ValueError): - schema_handler = SchemaHandler.load_schema_handler(None, config_enable) + schema_handler = SchemaHandler.load_schema_handler( + None, kpops_config_with_sr_enabled + ) assert schema_handler is not None schema_handler.schema_provider.provide_schema( "com.bakdata.kpops.test.SchemaHandlerTest", {} ) with pytest.raises(ValueError) as value_error: - schema_handler = SchemaHandler.load_schema_handler("", config_enable) + schema_handler = SchemaHandler.load_schema_handler( + "", kpops_config_with_sr_enabled + ) assert schema_handler is not None schema_handler.schema_provider.provide_schema( "com.bakdata.kpops.test.SchemaHandlerTest", {} @@ -157,10 +164,14 @@ def test_should_raise_value_error_when_schema_provider_is_called_and_components_ def test_should_log_info_when_submit_schemas_that_not_exists_and_dry_run_true( - to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock + to_section: ToSection, + log_info_mock: MagicMock, + schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] @@ -178,9 +189,11 @@ def test_should_log_info_when_submit_schemas_that_exists_and_dry_run_true( to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [1, 2, 3] @@ -199,10 +212,12 @@ def test_should_raise_exception_when_submit_schema_that_exists_and_not_compatibl topic_config: TopicConfig, to_section: ToSection, schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" @@ -239,10 +254,12 @@ def test_should_log_debug_when_submit_schema_that_exists_and_registered_under_ve log_info_mock: MagicMock, log_debug_mock: MagicMock, schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_provider = TestSchemaProvider() schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" schema = schema_provider.provide_schema(schema_class, {}) @@ -273,12 +290,14 @@ def test_should_submit_non_existing_schema_when_not_dry( to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_provider = TestSchemaProvider() schema_class = "com.bakdata.kpops.test.SchemaHandlerTest" schema = schema_provider.provide_schema(schema_class, {}) schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] @@ -300,9 +319,11 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( to_section: ToSection, log_info_mock: MagicMock, schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] @@ -317,10 +338,13 @@ def test_should_log_correct_message_when_delete_schemas_and_in_dry_run( def test_should_delete_schemas_when_not_in_dry_run( - to_section: ToSection, schema_registry_mock: MagicMock + to_section: ToSection, + schema_registry_mock: MagicMock, + kpops_config_with_sr_enabled: KpopsConfig, ): schema_handler = SchemaHandler( - url="http://mock:8081", components_module=TEST_SCHEMA_PROVIDER_MODULE + kpops_config=kpops_config_with_sr_enabled, + components_module=TEST_SCHEMA_PROVIDER_MODULE, ) schema_registry_mock.get_versions.return_value = [] diff --git a/tests/component_handlers/topic/test_proxy_wrapper.py b/tests/component_handlers/topic/test_proxy_wrapper.py index 7b587ecb3..e1ff9ae40 100644 --- a/tests/component_handlers/topic/test_proxy_wrapper.py +++ b/tests/component_handlers/topic/test_proxy_wrapper.py @@ -7,16 +7,16 @@ from pytest_httpx import HTTPXMock from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers.topic.exception import ( KafkaRestProxyError, TopicNotFoundException, ) from kpops.component_handlers.topic.model import TopicResponse, TopicSpec from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper +from kpops.config import KpopsConfig HEADERS = {"Content-Type": "application/json"} -HOST = "http://localhost:8082" +DEFAULT_HOST = "http://localhost:8082" DEFAULTS_PATH = Path(__file__).parent.parent / "resources" @@ -31,10 +31,8 @@ def log_debug_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture(autouse=True) def setup(self, httpx_mock: HTTPXMock): - config = PipelineConfig( - defaults_path=DEFAULTS_PATH, environment="development", kafka_rest_host=HOST - ) - self.proxy_wrapper = ProxyWrapper(pipeline_config=config) + config = KpopsConfig(defaults_path=DEFAULTS_PATH, environment="development") + self.proxy_wrapper = ProxyWrapper(config.kafka_rest) with open( DEFAULTS_PATH / "kafka_rest_proxy_responses" / "cluster-info.json" @@ -43,23 +41,13 @@ def setup(self, httpx_mock: HTTPXMock): httpx_mock.add_response( method="GET", - url=f"{HOST}/v3/clusters", + url=f"{DEFAULT_HOST}/v3/clusters", json=cluster_response, status_code=200, ) - assert self.proxy_wrapper.host == HOST + assert self.proxy_wrapper.url == DEFAULT_HOST assert self.proxy_wrapper.cluster_id == "cluster-1" - def test_should_raise_exception_when_host_is_not_set(self): - config = PipelineConfig(defaults_path=DEFAULTS_PATH, environment="development") - config.kafka_rest_host = None - with pytest.raises(ValueError) as exception: - ProxyWrapper(pipeline_config=config) - assert ( - str(exception.value) - == "The Kafka REST Proxy host is not set. Please set the host in the config.yaml using the kafka_rest_host property or set the environemt variable KPOPS_REST_PROXY_HOST." - ) - @patch("httpx.post") def test_should_create_topic_with_all_topic_configuration( self, mock_post: MagicMock @@ -78,7 +66,7 @@ def test_should_create_topic_with_all_topic_configuration( self.proxy_wrapper.create_topic(topic_spec=TopicSpec(**topic_spec)) mock_post.assert_called_with( - url=f"{HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics", + url=f"{DEFAULT_HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics", headers=HEADERS, json=topic_spec, ) @@ -91,7 +79,7 @@ def test_should_create_topic_with_no_configuration(self, mock_post: MagicMock): self.proxy_wrapper.create_topic(topic_spec=TopicSpec(**topic_spec)) mock_post.assert_called_with( - url=f"{HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics", + url=f"{DEFAULT_HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics", headers=HEADERS, json=topic_spec, ) @@ -104,7 +92,7 @@ def test_should_call_get_topic(self, mock_get: MagicMock): self.proxy_wrapper.get_topic(topic_name=topic_name) mock_get.assert_called_with( - url=f"{HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics/{topic_name}", + url=f"{DEFAULT_HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics/{topic_name}", headers=HEADERS, ) @@ -122,7 +110,7 @@ def test_should_call_batch_alter_topic_config(self, mock_put: MagicMock): ) mock_put.assert_called_with( - url=f"{HOST}/v3/clusters/cluster-1/topics/{topic_name}/configs:alter", + url=f"{DEFAULT_HOST}/v3/clusters/cluster-1/topics/{topic_name}/configs:alter", headers=HEADERS, json={ "data": [ @@ -140,7 +128,7 @@ def test_should_call_delete_topic(self, mock_delete: MagicMock): self.proxy_wrapper.delete_topic(topic_name=topic_name) mock_delete.assert_called_with( - url=f"{HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics/{topic_name}", + url=f"{DEFAULT_HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/topics/{topic_name}", headers=HEADERS, ) @@ -150,7 +138,7 @@ def test_should_call_get_broker_config(self, mock_get: MagicMock): self.proxy_wrapper.get_broker_config() mock_get.assert_called_with( - url=f"{HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/brokers/-/configs", + url=f"{DEFAULT_HOST}/v3/clusters/{self.proxy_wrapper.cluster_id}/brokers/-/configs", headers=HEADERS, ) @@ -169,7 +157,7 @@ def test_should_log_topic_creation( httpx_mock.add_response( method="POST", - url=f"{HOST}/v3/clusters/cluster-1/topics", + url=f"{DEFAULT_HOST}/v3/clusters/cluster-1/topics", json=topic_spec, headers=HEADERS, status_code=201, @@ -184,7 +172,7 @@ def test_should_log_topic_deletion( httpx_mock.add_response( method="DELETE", - url=f"{HOST}/v3/clusters/cluster-1/topics/{topic_name}", + url=f"{DEFAULT_HOST}/v3/clusters/cluster-1/topics/{topic_name}", headers=HEADERS, status_code=204, ) @@ -213,7 +201,7 @@ def test_should_get_topic(self, log_debug_mock: MagicMock, httpx_mock: HTTPXMock httpx_mock.add_response( method="GET", - url=f"{HOST}/v3/clusters/cluster-1/topics/{topic_name}", + url=f"{DEFAULT_HOST}/v3/clusters/cluster-1/topics/{topic_name}", headers=HEADERS, status_code=200, json=res, @@ -231,7 +219,7 @@ def test_should_rais_topic_not_found_exception_get_topic( httpx_mock.add_response( method="GET", - url=f"{HOST}/v3/clusters/cluster-1/topics/{topic_name}", + url=f"{DEFAULT_HOST}/v3/clusters/cluster-1/topics/{topic_name}", headers=HEADERS, status_code=404, json={ @@ -251,7 +239,7 @@ def test_should_log_reset_default_topic_config_when_deleted( httpx_mock.add_response( method="POST", - url=f"{HOST}/v3/clusters/cluster-1/topics/{topic_name}/configs:alter", + url=f"{DEFAULT_HOST}/v3/clusters/cluster-1/topics/{topic_name}/configs:alter", headers=HEADERS, json={"data": [{"name": config_name, "operation": "DELETE"}]}, status_code=204, diff --git a/tests/components/test_base_defaults_component.py b/tests/components/test_base_defaults_component.py index 7b25e5f74..dd593f826 100644 --- a/tests/components/test_base_defaults_component.py +++ b/tests/components/test_base_defaults_component.py @@ -3,12 +3,12 @@ import pytest -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers from kpops.components.base_components.base_defaults_component import ( BaseDefaultsComponent, load_defaults, ) +from kpops.config import KpopsConfig from kpops.utils.environment import ENV DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -38,8 +38,8 @@ class EnvVarTest(BaseDefaultsComponent): @pytest.fixture -def config() -> PipelineConfig: - return PipelineConfig( +def config() -> KpopsConfig: + return KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", ) @@ -116,9 +116,7 @@ def test_load_defaults_with_environment( == defaults ) - def test_inherit_defaults( - self, config: PipelineConfig, handlers: ComponentHandlers - ): + def test_inherit_defaults(self, config: KpopsConfig, handlers: ComponentHandlers): component = Child(config=config, handlers=handlers) assert ( @@ -137,7 +135,7 @@ def test_inherit_defaults( component.hard_coded == "hard_coded_value" ), "Defaults in code should be kept for parents" - def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): + def test_inherit(self, config: KpopsConfig, handlers: ComponentHandlers): component = Child( config=config, handlers=handlers, @@ -161,7 +159,7 @@ def test_inherit(self, config: PipelineConfig, handlers: ComponentHandlers): ), "Defaults in code should be kept for parents" def test_multiple_generations( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ): component = GrandChild(config=config, handlers=handlers) @@ -183,7 +181,7 @@ def test_multiple_generations( assert component.grand_child == "grand-child-value" def test_env_var_substitution( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ): ENV["pipeline_name"] = str(DEFAULTS_PATH) component = EnvVarTest(config=config, handlers=handlers) diff --git a/tests/components/test_kafka_app.py b/tests/components/test_kafka_app.py index c6527c00c..66d9daa31 100644 --- a/tests/components/test_kafka_app.py +++ b/tests/components/test_kafka_app.py @@ -4,7 +4,6 @@ import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmDiffConfig, @@ -12,14 +11,15 @@ HelmUpgradeInstallFlags, ) from kpops.components.base_components import KafkaApp +from kpops.config import KpopsConfig DEFAULTS_PATH = Path(__file__).parent / "resources" class TestKafkaApp: @pytest.fixture - def config(self) -> PipelineConfig: - return PipelineConfig( + def config(self) -> KpopsConfig: + return KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", helm_diff_config=HelmDiffConfig(), @@ -33,7 +33,7 @@ def handlers(self) -> ComponentHandlers: topic_handler=MagicMock(), ) - def test_default_configs(self, config: PipelineConfig, handlers: ComponentHandlers): + def test_default_configs(self, config: KpopsConfig, handlers: ComponentHandlers): kafka_app = KafkaApp( name="example-name", config=config, @@ -59,7 +59,7 @@ def test_default_configs(self, config: PipelineConfig, handlers: ComponentHandle def test_should_deploy_kafka_app( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, mocker: MockerFixture, ): diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 912f449fb..46616cd17 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -4,11 +4,11 @@ import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig, TopicNameConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import HelmDiffConfig from kpops.component_handlers.kafka_connect.model import KafkaConnectorConfig from kpops.components.base_components.kafka_connector import KafkaConnector +from kpops.config import KpopsConfig, TopicNameConfig DEFAULTS_PATH = Path(__file__).parent / "resources" CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop" @@ -19,15 +19,15 @@ class TestKafkaConnector: @pytest.fixture - def config(self) -> PipelineConfig: - return PipelineConfig( + def config(self) -> KpopsConfig: + return KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", topic_name_config=TopicNameConfig( default_error_topic_name="${component_type}-error-topic", default_output_topic_name="${component_type}-output-topic", ), - brokers="broker:9092", + kafka_brokers="broker:9092", helm_diff_config=HelmDiffConfig(), ) @@ -62,7 +62,7 @@ def connector_config(self) -> KafkaConnectorConfig: def test_connector_config_name_override( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ): diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 91760e90c..6861817bd 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -3,7 +3,6 @@ import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmUpgradeInstallFlags, @@ -25,6 +24,7 @@ TopicConfig, ToSection, ) +from kpops.config import KpopsConfig from kpops.utils.colorify import magentaify from tests.components.test_kafka_connector import ( CONNECTOR_CLEAN_FULL_NAME, @@ -42,7 +42,7 @@ def log_info_mock(self, mocker: MockerFixture) -> MagicMock: @pytest.fixture def connector( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ) -> KafkaSinkConnector: @@ -63,7 +63,7 @@ def connector( def test_connector_config_parsing( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ): @@ -93,7 +93,7 @@ def test_connector_config_parsing( def test_from_section_parsing_input_topic( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ): @@ -120,7 +120,7 @@ def test_from_section_parsing_input_topic( def test_from_section_parsing_input_pattern( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ): @@ -256,7 +256,7 @@ def test_clean_when_dry_run_is_true( def test_clean_when_dry_run_is_false( self, connector: KafkaSinkConnector, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, log_info_mock: MagicMock, @@ -334,7 +334,7 @@ def test_clean_when_dry_run_is_false( def test_clean_without_to_when_dry_run_is_true( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, dry_run_handler: MagicMock, connector_config: KafkaConnectorConfig, @@ -353,7 +353,7 @@ def test_clean_without_to_when_dry_run_is_true( def test_clean_without_to_when_dry_run_is_false( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, dry_run_handler: MagicMock, diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index db9a2dd77..82b042d0c 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -3,7 +3,6 @@ import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmUpgradeInstallFlags, @@ -22,6 +21,7 @@ TopicConfig, ToSection, ) +from kpops.config import KpopsConfig from kpops.utils.environment import ENV from tests.components.test_kafka_connector import ( CONNECTOR_CLEAN_FULL_NAME, @@ -35,7 +35,7 @@ class TestKafkaSourceConnector(TestKafkaConnector): @pytest.fixture def connector( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ) -> KafkaSourceConnector: @@ -57,7 +57,7 @@ def connector( def test_from_section_raises_exception( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, connector_config: KafkaConnectorConfig, ): @@ -266,7 +266,7 @@ def test_clean_when_dry_run_is_false( def test_clean_without_to_when_dry_run_is_false( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, dry_run_handler: MagicMock, @@ -342,7 +342,7 @@ def test_clean_without_to_when_dry_run_is_false( def test_clean_without_to_when_dry_run_is_true( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, dry_run_handler: MagicMock, connector_config: KafkaConnectorConfig, diff --git a/tests/components/test_kubernetes_app.py b/tests/components/test_kubernetes_app.py index 46eb9795d..d89db64bd 100644 --- a/tests/components/test_kubernetes_app.py +++ b/tests/components/test_kubernetes_app.py @@ -5,7 +5,6 @@ from pytest_mock import MockerFixture from typing_extensions import override -from kpops.cli.pipeline_config import PipelineConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmDiffConfig, @@ -17,6 +16,7 @@ KubernetesApp, KubernetesAppConfig, ) +from kpops.config import KpopsConfig from kpops.utils.colorify import magentaify DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -28,8 +28,8 @@ class KubernetesTestValue(KubernetesAppConfig): class TestKubernetesApp: @pytest.fixture - def config(self) -> PipelineConfig: - return PipelineConfig( + def config(self) -> KpopsConfig: + return KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", helm_diff_config=HelmDiffConfig(), @@ -64,7 +64,7 @@ def repo_config(self) -> HelmRepoConfig: @pytest.fixture def kubernetes_app( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, app_value: KubernetesTestValue, repo_config: HelmRepoConfig, @@ -106,7 +106,7 @@ def test_should_lazy_load_helm_wrapper_and_not_repo_add( def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, mocker: MockerFixture, @@ -152,7 +152,7 @@ def test_should_lazy_load_helm_wrapper_and_call_repo_add_when_implemented( def test_should_deploy_app_with_local_helm_chart( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, helm_mock: MagicMock, app_value: KubernetesTestValue, @@ -218,7 +218,7 @@ def test_should_call_helm_uninstall_when_destroying_kubernetes_app( def test_should_raise_value_error_when_name_is_not_valid( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, app_value: KubernetesTestValue, repo_config: HelmRepoConfig, diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 56d52a68b..2c7853fb3 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -5,7 +5,6 @@ import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig, TopicNameConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import HelmUpgradeInstallFlags from kpops.components import ProducerApp @@ -13,6 +12,7 @@ OutputTopicTypes, TopicConfig, ) +from kpops.config import KpopsConfig, TopicNameConfig DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -30,8 +30,8 @@ def handlers(self) -> ComponentHandlers: ) @pytest.fixture - def config(self) -> PipelineConfig: - return PipelineConfig( + def config(self) -> KpopsConfig: + return KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", topic_name_config=TopicNameConfig( @@ -42,7 +42,7 @@ def config(self) -> PipelineConfig: @pytest.fixture def producer_app( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ) -> ProducerApp: return ProducerApp( name=self.PRODUCER_APP_NAME, @@ -65,7 +65,7 @@ def producer_app( }, ) - def test_output_topics(self, config: PipelineConfig, handlers: ComponentHandlers): + def test_output_topics(self, config: KpopsConfig, handlers: ComponentHandlers): producer_app = ProducerApp( name=self.PRODUCER_APP_NAME, config=config, diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index dce2c7e96..50ab2c332 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -4,7 +4,6 @@ import pytest from pytest_mock import MockerFixture -from kpops.cli.pipeline_config import PipelineConfig, TopicNameConfig from kpops.component_handlers import ComponentHandlers from kpops.component_handlers.helm_wrapper.model import ( HelmDiffConfig, @@ -17,6 +16,7 @@ TopicConfig, ToSection, ) +from kpops.config import KpopsConfig, TopicNameConfig DEFAULTS_PATH = Path(__file__).parent / "resources" @@ -34,8 +34,8 @@ def handlers(self) -> ComponentHandlers: ) @pytest.fixture - def config(self) -> PipelineConfig: - return PipelineConfig( + def config(self) -> KpopsConfig: + return KpopsConfig( defaults_path=DEFAULTS_PATH, environment="development", topic_name_config=TopicNameConfig( @@ -47,7 +47,7 @@ def config(self) -> PipelineConfig: @pytest.fixture def streams_app( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ) -> StreamsApp: return StreamsApp( name=self.STREAMS_APP_NAME, @@ -68,7 +68,7 @@ def streams_app( }, ) - def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): + def test_set_topics(self, config: KpopsConfig, handlers: ComponentHandlers): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, config=config, @@ -113,7 +113,7 @@ def test_set_topics(self, config: PipelineConfig, handlers: ComponentHandlers): assert "extraInputPatterns" in streams_config def test_no_empty_input_topic( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -143,7 +143,7 @@ def test_no_empty_input_topic( assert "inputPattern" in streams_config assert "extraInputPatterns" not in streams_config - def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandlers): + def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers): # An exception should be raised when both role and type are defined and type is input with pytest.raises(ValueError): StreamsApp( @@ -189,7 +189,7 @@ def test_should_validate(self, config: PipelineConfig, handlers: ComponentHandle ) def test_set_streams_output_from_to( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -228,7 +228,7 @@ def test_set_streams_output_from_to( assert streams_app.app.streams.error_topic == "${error_topic_name}" def test_weave_inputs_from_prev_component( - self, config: PipelineConfig, handlers: ComponentHandlers + self, config: KpopsConfig, handlers: ComponentHandlers ): streams_app = StreamsApp( name=self.STREAMS_APP_NAME, @@ -265,7 +265,7 @@ def test_weave_inputs_from_prev_component( def test_deploy_order_when_dry_run_is_false( self, - config: PipelineConfig, + config: KpopsConfig, handlers: ComponentHandlers, mocker: MockerFixture, ): diff --git a/tests/pipeline/resources/custom-config/config.yaml b/tests/pipeline/resources/custom-config/config.yaml index 2707ee0fa..8a9ca81c3 100644 --- a/tests/pipeline/resources/custom-config/config.yaml +++ b/tests/pipeline/resources/custom-config/config.yaml @@ -3,9 +3,13 @@ defaults_path: ../no-topics-defaults topic_name_config: default_error_topic_name: "${component_name}-dead-letter-topic" default_output_topic_name: "${component_name}-test-topic" -brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" -kafka_connect_host: "http://localhost:8083" -kafka_rest_host: "http://localhost:8082" -schema_registry_url: "http://localhost:8081" +kafka_brokers: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092" +kafka_connect: + url: "http://localhost:8083" +kafka_rest: + url: "http://localhost:8082" +schema_registry: + enabled: true + url: "http://localhost:8081" helm_config: api_version: "2.1.1" diff --git a/tests/pipeline/resources/defaults.yaml b/tests/pipeline/resources/defaults.yaml index c4e2aa259..e1223203b 100644 --- a/tests/pipeline/resources/defaults.yaml +++ b/tests/pipeline/resources/defaults.yaml @@ -5,7 +5,7 @@ kubernetes-app: kafka-app: app: streams: - brokers: "${brokers}" + brokers: "${kafka_brokers}" schema_registry_url: "${schema_registry_url}" version: "2.4.2" diff --git a/tests/pipeline/resources/kafka-connect-sink-config/config.yaml b/tests/pipeline/resources/kafka-connect-sink-config/config.yaml index 6b7c754ab..14c488c5f 100644 --- a/tests/pipeline/resources/kafka-connect-sink-config/config.yaml +++ b/tests/pipeline/resources/kafka-connect-sink-config/config.yaml @@ -1,10 +1,12 @@ environment: development defaults_path: .. -brokers: "broker:9092" +kafka_brokers: "broker:9092" topic_name_config: default_error_topic_name: ${component_type}-error-topic default_output_topic_name: ${component_type}-output-topic helm_diff_config: enable: false -kafka_connect_host: "kafka_connect_host:8083" -kafka_rest_host: "kafka_rest_host:8082" +kafka_connect: + url: "http://kafka_connect_url:8083" +kafka_rest: + url: "http://kafka_rest_url:8082" diff --git a/tests/pipeline/resources/no-topics-defaults/defaults.yaml b/tests/pipeline/resources/no-topics-defaults/defaults.yaml index 47de626e6..87d21d47d 100644 --- a/tests/pipeline/resources/no-topics-defaults/defaults.yaml +++ b/tests/pipeline/resources/no-topics-defaults/defaults.yaml @@ -1,7 +1,7 @@ kafka-app: app: streams: - brokers: "${brokers}" + brokers: "${kafka_brokers}" schemaRegistryUrl: "${schema_registry_url}" producer-app: diff --git a/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml b/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml index dfbe23db9..c67f869d9 100644 --- a/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml +++ b/tests/pipeline/resources/pipeline-component-should-have-prefix/defaults.yaml @@ -7,5 +7,5 @@ kubernetes-app: kafka-app: app: streams: - brokers: ${brokers} + brokers: ${kafka_brokers} schemaRegistryUrl: ${schema_registry_url} diff --git a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml index 2564e0012..77d666b1e 100644 --- a/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-env-defaults/defaults.yaml @@ -4,7 +4,7 @@ kubernetes-app: kafka-app: app: streams: - brokers: "${brokers}" + brokers: "${kafka_brokers}" schemaRegistryUrl: "${schema_registry_url}" producer-app: {} # inherits from kafka-app diff --git a/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml b/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml index 00b3b2673..3b9e93eb7 100644 --- a/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml +++ b/tests/pipeline/resources/pipeline-with-short-topics/defaults.yaml @@ -5,7 +5,7 @@ kubernetes-app: kafka-app: app: streams: - brokers: "${broker}" + brokers: "${kafka_brokers}" schema_registry_url: "${schema_registry_url}" version: "2.4.2" diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index af9cde479..42992bbb7 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -462,7 +462,6 @@ def test_default_config(self, snapshot: SnapshotTest): def test_env_vars_precedence_over_config( self, monkeypatch: MonkeyPatch, - snapshot: SnapshotTest, ): monkeypatch.setenv(name="KPOPS_KAFKA_BROKERS", value="env_broker")