From 78001a2f4cbdd2463a45c8fa542a6c48e19bc883 Mon Sep 17 00:00:00 2001 From: Ivan Yordanov Date: Tue, 24 Oct 2023 21:23:37 +0300 Subject: [PATCH] fix: adapt to new AnyHttpUrl and pydantic v2 validation --- .../resources/variables/config_env_vars.env | 30 +++++++---- .../resources/variables/config_env_vars.md | 24 +++++---- docs/docs/schema/pipeline.json | 52 +++++++++++-------- .../kafka_connect/connect_wrapper.py | 10 ++-- .../schema_handler/schema_handler.py | 2 +- .../component_handlers/topic/proxy_wrapper.py | 14 ++--- kpops/config.py | 16 ++---- tests/cli/test_kpops_config.py | 16 +++--- .../topic/test_proxy_wrapper.py | 3 +- 9 files changed, 94 insertions(+), 73 deletions(-) diff --git a/docs/docs/resources/variables/config_env_vars.env b/docs/docs/resources/variables/config_env_vars.env index db6a641af..1e38d4d9d 100644 --- a/docs/docs/resources/variables/config_env_vars.env +++ b/docs/docs/resources/variables/config_env_vars.env @@ -13,19 +13,27 @@ defaults_path=. # The environment you want to generate and deploy the pipeline to. # Suffix your environment files with this value (e.g. # defaults_development.yaml for environment=development). -KPOPS_ENVIRONMENT # No default value, required +environment=PydanticUndefined # kafka_brokers # The comma separated Kafka brokers address. -KPOPS_KAFKA_BROKERS # No default value, required -# url -# Address of the Schema Registry. -KPOPS_SCHEMA_REGISTRY_URL=http://localhost:8081 -# url -# Address of the Kafka REST Proxy. -KPOPS_KAFKA_REST_URL=http://localhost:8082 -# url -# Address of Kafka Connect. -KPOPS_KAFKA_CONNECT_URL=http://localhost:8083 +kafka_brokers=PydanticUndefined +# defaults_filename_prefix +# The name of the defaults file and the prefix of the defaults +# environment file. +defaults_filename_prefix=defaults +# topic_name_config +# Configure the topic name variables you can use in the pipeline +# definition. +topic_name_config=default_output_topic_name='${pipeline_name}-${component_name}' default_error_topic_name='${pipeline_name}-${component_name}-error' +# schema_registry +# Configuration for Schema Registry. +schema_registry=enabled=False url=Url('http://localhost:8081/') +# kafka_rest +# Configuration for Kafka REST Proxy. +kafka_rest=url=Url('http://localhost:8082/') +# kafka_connect +# Configuration for Kafka Connect. +kafka_connect=url=Url('http://localhost:8083/') # timeout # The timeout in seconds that specifies when actions like deletion or # deploy timeout. diff --git a/docs/docs/resources/variables/config_env_vars.md b/docs/docs/resources/variables/config_env_vars.md index 0fac41eb1..9c1c704a9 100644 --- a/docs/docs/resources/variables/config_env_vars.md +++ b/docs/docs/resources/variables/config_env_vars.md @@ -1,11 +1,17 @@ These variables are a lower priority alternative to the settings in `config.yaml`. Variables marked as required can instead be set in the pipeline config. -| Name | Default Value | Required | Description | Setting name | -| ------------------------- | --------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------- | -| KPOPS_ENVIRONMENT | | True | The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development). | environment | -| KPOPS_KAFKA_BROKERS | | True | The comma separated Kafka brokers address. | kafka_brokers | -| KPOPS_SCHEMA_REGISTRY_URL | http://localhost:8081 | False | Address of the Schema Registry. | url | -| KPOPS_KAFKA_REST_URL | http://localhost:8082 | False | Address of the Kafka REST Proxy. | url | -| KPOPS_KAFKA_CONNECT_URL | http://localhost:8083 | False | Address of Kafka Connect. | url | -| KPOPS_TIMEOUT | 300 | False | The timeout in seconds that specifies when actions like deletion or deploy timeout. | timeout | -| KPOPS_RETAIN_CLEAN_JOBS | False | False | Whether to retain clean up jobs in the cluster or uninstall the, after completion. | retain_clean_jobs | +| Name | Default Value |Required| Description | Setting name | +|------------------------|----------------------------------------------------------------------------------------------------------------------------------|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------| +|defaults_path |. |False |The path to the folder containing the defaults.yaml file and the environment defaults files. Paths can either be absolute or relative to `config.yaml` |defaults_path | +|environment |PydanticUndefined |False |The environment you want to generate and deploy the pipeline to. Suffix your environment files with this value (e.g. defaults_development.yaml for environment=development).|environment | +|kafka_brokers |PydanticUndefined |False |The comma separated Kafka brokers address. |kafka_brokers | +|defaults_filename_prefix|defaults |False |The name of the defaults file and the prefix of the defaults environment file. |defaults_filename_prefix| +|topic_name_config |default_output_topic_name='${pipeline_name}-${component_name}' default_error_topic_name='${pipeline_name}-${component_name}-error'|False |Configure the topic name variables you can use in the pipeline definition. |topic_name_config | +|schema_registry |enabled=False url=Url('http://localhost:8081/') |False |Configuration for Schema Registry. |schema_registry | +|kafka_rest |url=Url('http://localhost:8082/') |False |Configuration for Kafka REST Proxy. |kafka_rest | +|kafka_connect |url=Url('http://localhost:8083/') |False |Configuration for Kafka Connect. |kafka_connect | +|timeout |300 |False |The timeout in seconds that specifies when actions like deletion or deploy timeout. |timeout | +|create_namespace |False |False |Flag for `helm upgrade --install`. Create the release namespace if not present. |create_namespace | +|helm_config |context=None debug=False api_version=None |False |Global flags for Helm. |helm_config | +|helm_diff_config |ignore=set() |False |Configure Helm Diff. |helm_diff_config | +|retain_clean_jobs |False |False |Whether to retain clean up jobs in the cluster or uninstall the, after completion. |retain_clean_jobs | diff --git a/docs/docs/schema/pipeline.json b/docs/docs/schema/pipeline.json index 468cecae0..85c03a513 100644 --- a/docs/docs/schema/pipeline.json +++ b/docs/docs/schema/pipeline.json @@ -60,6 +60,7 @@ "type": "object" }, "HelmApp": { + "additionalProperties": true, "description": "Kubernetes app managed through Helm with an associated Helm chart.", "properties": { "app": { @@ -68,15 +69,18 @@ "$ref": "#/definitions/KubernetesAppConfig" } ], - "description": "Application-specific settings", - "title": "App" + "description": "Application-specific settings" }, "from": { - "allOf": [ + "anyOf": [ { "$ref": "#/definitions/FromSection" + }, + { + "type": "null" } ], + "default": null, "description": "Topic(s) and/or components from which the component will read input", "title": "From" }, @@ -97,36 +101,41 @@ "type": "string" }, "repo_config": { - "allOf": [ + "anyOf": [ { "$ref": "#/definitions/HelmRepoConfig" + }, + { + "type": "null" } ], - "description": "Configuration of the Helm chart repo to be used for deploying the component", - "title": "Repo Config" + "default": null, + "description": "Configuration of the Helm chart repo to be used for deploying the component" }, "to": { - "allOf": [ + "anyOf": [ { "$ref": "#/definitions/ToSection" + }, + { + "type": "null" } ], - "description": "Topic(s) into which the component will write output", - "title": "To" - }, - "type": { - "default": "helm-app", - "description": "Kubernetes app managed through Helm with an associated Helm chart.", - "enum": [ - "helm-app" - ], - "title": "Component type", - "type": "string" + "default": null, + "description": "Topic(s) into which the component will write output" }, "version": { + "anyOf": [ + { + "type": "string" + }, + { + "type": "null" + } + ], + "default": null, "description": "Helm chart version", - "title": "Version", - "type": "string" + "title": "Version" } }, "required": [ @@ -424,7 +433,8 @@ "type": "object" }, "KubernetesAppConfig": { - "description": "Settings specific to Kubernetes apps.", + "additionalProperties": true, + "description": "Settings specific to Kubernetes Apps.", "properties": {}, "title": "KubernetesAppConfig", "type": "object" diff --git a/kpops/component_handlers/kafka_connect/connect_wrapper.py b/kpops/component_handlers/kafka_connect/connect_wrapper.py index 1cc5a2f9a..4d92bad03 100644 --- a/kpops/component_handlers/kafka_connect/connect_wrapper.py +++ b/kpops/component_handlers/kafka_connect/connect_wrapper.py @@ -49,7 +49,7 @@ def create_connector( config_json = connector_config.model_dump() connect_data = {"name": connector_config.name, "config": config_json} response = httpx.post( - url=f"{self.url}/connectors", headers=HEADERS, json=connect_data + url=f"{self.url}connectors", headers=HEADERS, json=connect_data ) if response.status_code == httpx.codes.CREATED: log.info(f"Connector {connector_config.name} created.") @@ -74,7 +74,7 @@ def get_connector(self, connector_name: str | None) -> KafkaConnectResponse: msg = "Connector name not set" raise Exception(msg) response = httpx.get( - url=f"{self.url}/connectors/{connector_name}", headers=HEADERS + url=f"{self.url}connectors/{connector_name}", headers=HEADERS ) if response.status_code == httpx.codes.OK: log.info(f"Connector {connector_name} exists.") @@ -105,7 +105,7 @@ def update_connector_config( connector_name = connector_config.name config_json = connector_config.model_dump() response = httpx.put( - url=f"{self.url}/connectors/{connector_name}/config", + url=f"{self.url}connectors/{connector_name}/config", headers=HEADERS, json=config_json, ) @@ -136,7 +136,7 @@ def validate_connector_config( :return: List of all found errors """ response = httpx.put( - url=f"{self.url}/connector-plugins/{connector_config.class_name}/config/validate", + url=f"{self.url}connector-plugins/{connector_config.class_name}/config/validate", headers=HEADERS, json=connector_config.model_dump(), ) @@ -166,7 +166,7 @@ def delete_connector(self, connector_name: str) -> None: :raises ConnectorNotFoundException: Connector not found """ response = httpx.delete( - url=f"{self.url}/connectors/{connector_name}", headers=HEADERS + url=f"{self.url}connectors/{connector_name}", headers=HEADERS ) if response.status_code == httpx.codes.NO_CONTENT: log.info(f"Connector {connector_name} deleted.") diff --git a/kpops/component_handlers/schema_handler/schema_handler.py b/kpops/component_handlers/schema_handler/schema_handler.py index e4eba9931..fae2da0e7 100644 --- a/kpops/component_handlers/schema_handler/schema_handler.py +++ b/kpops/component_handlers/schema_handler/schema_handler.py @@ -30,7 +30,7 @@ def __init__( components_module: str | None, ) -> None: self.schema_registry_client = SchemaRegistryClient( - kpops_config.schema_registry.url + str(kpops_config.schema_registry.url) ) self.components_module = components_module diff --git a/kpops/component_handlers/topic/proxy_wrapper.py b/kpops/component_handlers/topic/proxy_wrapper.py index 1b246d6f5..aa1db6283 100644 --- a/kpops/component_handlers/topic/proxy_wrapper.py +++ b/kpops/component_handlers/topic/proxy_wrapper.py @@ -46,7 +46,7 @@ def cluster_id(self) -> str: :raises KafkaRestProxyError: Kafka REST proxy error :return: The Kafka cluster ID. """ - response = httpx.get(url=f"{self._config.url}/v3/clusters") + response = httpx.get(url=f"{self._config.url!s}v3/clusters") if response.status_code == httpx.codes.OK: cluster_information = response.json() return cluster_information["data"][0]["cluster_id"] @@ -67,7 +67,7 @@ def create_topic(self, topic_spec: TopicSpec) -> None: :raises KafkaRestProxyError: Kafka REST proxy error """ response = httpx.post( - url=f"{self.url}/v3/clusters/{self.cluster_id}/topics", + url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics", headers=HEADERS, json=topic_spec.model_dump(exclude_none=True), ) @@ -88,7 +88,7 @@ def delete_topic(self, topic_name: str) -> None: :raises KafkaRestProxyError: Kafka REST proxy error """ response = httpx.delete( - url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}", + url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}", headers=HEADERS, ) if response.status_code == httpx.codes.NO_CONTENT: @@ -109,7 +109,7 @@ def get_topic(self, topic_name: str) -> TopicResponse: :return: Response of the get topic API. """ response = httpx.get( - url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}", + url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}", headers=HEADERS, ) if response.status_code == httpx.codes.OK: @@ -139,7 +139,7 @@ def get_topic_config(self, topic_name: str) -> TopicConfigResponse: :return: The topic configuration. """ response = httpx.get( - url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs", + url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}/configs", headers=HEADERS, ) @@ -169,7 +169,7 @@ def batch_alter_topic_config(self, topic_name: str, json_body: list[dict]) -> No :raises KafkaRestProxyError: Kafka REST proxy error """ response = httpx.post( - url=f"{self.url}/v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter", + url=f"{self.url!s}v3/clusters/{self.cluster_id}/topics/{topic_name}/configs:alter", headers=HEADERS, json={"data": json_body}, ) @@ -189,7 +189,7 @@ def get_broker_config(self) -> BrokerConfigResponse: :return: The broker configuration. """ response = httpx.get( - url=f"{self.url}/v3/clusters/{self.cluster_id}/brokers/-/configs", + url=f"{self.url!s}v3/clusters/{self.cluster_id}/brokers/-/configs", headers=HEADERS, ) diff --git a/kpops/config.py b/kpops/config.py index ca66d0cfb..c6c5c18b6 100644 --- a/kpops/config.py +++ b/kpops/config.py @@ -1,20 +1,18 @@ from __future__ import annotations from pathlib import Path -from typing import override -from pydantic import AliasChoices, Field, AnyHttpUrl, parse_obj_as +from pydantic import AnyHttpUrl, Field, TypeAdapter from pydantic_settings import ( BaseSettings, PydanticBaseSettingsSource, SettingsConfigDict, ) +from typing_extensions import override from kpops.cli.settings_sources import YamlConfigSettingsSource from kpops.component_handlers.helm_wrapper.model import HelmConfig, HelmDiffConfig from kpops.utils.docstring import describe_object -from kpops.utils.yaml_loading import load_yaml_file - ENV_PREFIX = "KPOPS_" @@ -42,8 +40,7 @@ class SchemaRegistryConfig(BaseSettings): url: AnyHttpUrl = Field( # For validating URLs use parse_obj_as # https://github.com/pydantic/pydantic/issues/1106 - default=parse_obj_as(AnyHttpUrl, "http://localhost:8081"), - validation_alias=f"{ENV_PREFIX}SCHEMA_REGISTRY_URL", + default=TypeAdapter(AnyHttpUrl).validate_python("http://localhost:8081"), description="Address of the Schema Registry.", ) @@ -52,8 +49,7 @@ class KafkaRestConfig(BaseSettings): """Configuration for Kafka REST Proxy.""" url: AnyHttpUrl = Field( - default=parse_obj_as(AnyHttpUrl, "http://localhost:8082"), - validation_alias=f"{ENV_PREFIX}KAFKA_REST_URL", + default=TypeAdapter(AnyHttpUrl).validate_python("http://localhost:8082"), description="Address of the Kafka REST Proxy.", ) @@ -62,8 +58,7 @@ class KafkaConnectConfig(BaseSettings): """Configuration for Kafka Connect.""" url: AnyHttpUrl = Field( - default=parse_obj_as(AnyHttpUrl, "http://localhost:8083"), - validation_alias=f"{ENV_PREFIX}KAFKA_CONNECT_URL", + default=TypeAdapter(AnyHttpUrl).validate_python("http://localhost:8083"), description="Address of Kafka Connect.", ) @@ -92,7 +87,6 @@ class KpopsConfig(BaseSettings): "broker1:9092,broker2:9092,broker3:9092", ], description="The comma separated Kafka brokers address.", - validation_alias=AliasChoices("brokers", f"{ENV_PREFIX}kafka_brokers"), ) defaults_filename_prefix: str = Field( default="defaults", diff --git a/tests/cli/test_kpops_config.py b/tests/cli/test_kpops_config.py index 33db1560a..717a67e46 100644 --- a/tests/cli/test_kpops_config.py +++ b/tests/cli/test_kpops_config.py @@ -1,7 +1,7 @@ from pathlib import Path import pytest -from pydantic import AnyHttpUrl, ValidationError, parse_obj_as +from pydantic import AnyHttpUrl, TypeAdapter, ValidationError from kpops.config import ( KafkaConnectConfig, @@ -27,9 +27,9 @@ def test_kpops_config_with_default_values(): == "${pipeline_name}-${component_name}-error" ) assert default_config.schema_registry.enabled is False - assert default_config.schema_registry.url == "http://localhost:8081" - assert default_config.kafka_rest.url == "http://localhost:8082" - assert default_config.kafka_connect.url == "http://localhost:8083" + assert default_config.schema_registry.url == AnyHttpUrl("http://localhost:8081") + assert default_config.kafka_rest.url == AnyHttpUrl("http://localhost:8082") + assert default_config.kafka_connect.url == AnyHttpUrl("http://localhost:8083") assert default_config.timeout == 300 assert default_config.create_namespace is False assert default_config.helm_config.context is None @@ -45,7 +45,7 @@ def test_kpops_config_with_different_invalid_urls(): environment="development", kafka_brokers="http://broker:9092", kafka_connect=KafkaConnectConfig( - url=parse_obj_as(AnyHttpUrl, "invalid-host") + url=TypeAdapter(AnyHttpUrl).validate_python("invalid-host") ), ) @@ -53,7 +53,9 @@ def test_kpops_config_with_different_invalid_urls(): KpopsConfig( environment="development", kafka_brokers="http://broker:9092", - kafka_rest=KafkaRestConfig(url=parse_obj_as(AnyHttpUrl, "invalid-host")), + kafka_rest=KafkaRestConfig( + url=TypeAdapter(AnyHttpUrl).validate_python("invalid-host") + ), ) with pytest.raises(ValidationError): @@ -62,6 +64,6 @@ def test_kpops_config_with_different_invalid_urls(): kafka_brokers="http://broker:9092", schema_registry=SchemaRegistryConfig( enabled=True, - url=parse_obj_as(AnyHttpUrl, "invalid-host"), + url=TypeAdapter(AnyHttpUrl).validate_python("invalid-host"), ), ) diff --git a/tests/component_handlers/topic/test_proxy_wrapper.py b/tests/component_handlers/topic/test_proxy_wrapper.py index bbd87bc1e..3cee5f06b 100644 --- a/tests/component_handlers/topic/test_proxy_wrapper.py +++ b/tests/component_handlers/topic/test_proxy_wrapper.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, patch import pytest +from pydantic import AnyHttpUrl from pytest_httpx import HTTPXMock from pytest_mock import MockerFixture @@ -45,7 +46,7 @@ def _setup(self, httpx_mock: HTTPXMock): json=cluster_response, status_code=200, ) - assert self.proxy_wrapper.url == DEFAULT_HOST + assert self.proxy_wrapper.url == AnyHttpUrl(DEFAULT_HOST) assert self.proxy_wrapper.cluster_id == "cluster-1" @patch("httpx.post")