diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index ed4a720f3..1d9a296b3 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -5,6 +5,7 @@ from functools import cached_property from typing import Any, NoReturn +import pydantic from pydantic import Field, PrivateAttr, ValidationInfo, computed_field, field_validator from typing_extensions import override @@ -25,6 +26,11 @@ from kpops.utils.colorify import magentaify from kpops.utils.docstring import describe_attr +try: + from typing import Self # pyright: ignore[reportAttributeAccessIssue] +except ImportError: + from typing_extensions import Self + log = logging.getLogger("KafkaConnector") @@ -205,6 +211,12 @@ class KafkaSourceConnector(KafkaConnector): _connector_type: KafkaConnectorType = PrivateAttr(KafkaConnectorType.SOURCE) + @pydantic.model_validator(mode="after") + def populate_offset_topic(self) -> Self: + if self.offset_topic: + self._resetter.app.config.offset_topic = self.offset_topic + return self + @computed_field @cached_property def _resetter(self) -> KafkaConnectorResetter: @@ -217,13 +229,11 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override async def reset(self, dry_run: bool) -> None: - self._resetter.app.config.offset_topic = self.offset_topic await self._resetter.reset(dry_run) @override async def clean(self, dry_run: bool) -> None: await super().clean(dry_run) - self._resetter.app.config.offset_topic = self.offset_topic await self._resetter.clean(dry_run) diff --git a/kpops/utils/gen_schema.py b/kpops/utils/gen_schema.py index 2b7d61862..3b4ce5ad3 100644 --- a/kpops/utils/gen_schema.py +++ b/kpops/utils/gen_schema.py @@ -129,8 +129,10 @@ def gen_pipeline_schema( default=component.type, ) core_schema: DefinitionsSchema = component.__pydantic_core_schema__ # pyright:ignore[reportAssignmentType] - - model_schema: ModelFieldsSchema = core_schema["schema"]["schema"]["schema"] # pyright:ignore[reportGeneralTypeIssues,reportTypedDictNotRequiredAccess,reportAssignmentType] + schema = core_schema + while "schema" in schema: + schema = schema["schema"] + model_schema: ModelFieldsSchema = schema # pyright:ignore[reportAssignmentType] model_schema["fields"]["type"] = ModelField( type="model-field", schema=LiteralSchema( diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 3860aeb60..46aea7645 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -72,6 +72,9 @@ def test_resetter_release_name(self, connector: KafkaSourceConnector): assert isinstance(resetter, KafkaConnectorResetter) assert connector._resetter.helm_release_name == CONNECTOR_CLEAN_RELEASE_NAME + def test_resetter_offset_topic(self, connector: KafkaSourceConnector): + assert connector._resetter.app.config.offset_topic == OFFSETS_TOPIC + def test_from_section_raises_exception( self, config: KpopsConfig,