diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 2d0d9cd03..1f4aeb43b 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -71,6 +71,8 @@ class KafkaConnector(PipelineComponent, ABC): description=describe_attr("resetter_values", __doc__), ) + _connector_type: KafkaConnectorType = Field(default=..., hidden_from_schema=True) + @validator("app", pre=True) def connector_config_should_have_component_name( cls, @@ -172,7 +174,6 @@ def clean(self, dry_run: bool) -> None: def _run_connect_resetter( self, - connector_type: KafkaConnectorType, dry_run: bool, retain_clean_jobs: bool, **kwargs, @@ -183,7 +184,6 @@ def _run_connect_resetter( to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. If the retain_clean_jobs flag is set to false the cleanup job will be deleted. - :param connector_type: Type of the connector (SINK or SOURCE) :param dry_run: If the cleanup should be run in dry run mode or not :param retain_clean_jobs: If the cleanup job should be kept :param kwargs: Other values for the KafkaConnectResetter @@ -199,11 +199,11 @@ def _run_connect_resetter( log.info( magentaify( - f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {self.full_name}" + f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}" ) ) - stdout = self.__install_connect_resetter(connector_type, dry_run, **kwargs) + stdout = self.__install_connect_resetter(dry_run, **kwargs) if dry_run: self.dry_run_handler.print_helm_diff( @@ -218,7 +218,6 @@ def _run_connect_resetter( def __install_connect_resetter( self, - connector_type: KafkaConnectorType, dry_run: bool, **kwargs, ) -> str: @@ -242,14 +241,12 @@ def __install_connect_resetter( wait=True, ), values=self._get_kafka_connect_resetter_values( - connector_type, **kwargs, ), ) def _get_kafka_connect_resetter_values( self, - connector_type: KafkaConnectorType, **kwargs, ) -> dict: """Get connector resetter helm chart values @@ -264,7 +261,7 @@ def _get_kafka_connect_resetter_values( brokers=self.config.brokers, **kwargs, ), - connector_type=connector_type.value, + connector_type=self._connector_type.value, name_override=self.full_name, ).dict(), **self.resetter_values, @@ -296,6 +293,8 @@ class KafkaSourceConnector(KafkaConnector): description=describe_attr("offset_topic", __doc__), ) + _connector_type = KafkaConnectorType.SOURCE + @override def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: raise NotImplementedError("Kafka source connector doesn't support FromSection") @@ -303,7 +302,6 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override def template(self) -> None: values = self._get_kafka_connect_resetter_values( - KafkaConnectorType.SOURCE, offset_topic=self.offset_topic, ) stdout = self.helm.template( @@ -330,7 +328,6 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None: :param dry_run: Whether to do a dry run of the command """ self._run_connect_resetter( - connector_type=KafkaConnectorType.SOURCE, dry_run=dry_run, retain_clean_jobs=self.config.retain_clean_jobs, offset_topic=self.offset_topic, @@ -340,6 +337,8 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None: class KafkaSinkConnector(KafkaConnector): """Kafka sink connector model""" + _connector_type = KafkaConnectorType.SINK + @override def add_input_topics(self, topics: list[str]) -> None: existing_topics: str | None = getattr(self.app, "topics", None) @@ -349,7 +348,7 @@ def add_input_topics(self, topics: list[str]) -> None: @override def template(self) -> None: - values = self._get_kafka_connect_resetter_values(KafkaConnectorType.SINK) + values = self._get_kafka_connect_resetter_values() stdout = self.helm.template( self._connector_resetter_release_name, self._resetter_helm_chart,