diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index c97cc987f..e53886d68 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -46,6 +46,7 @@ class KafkaConnector(PipelineComponent, ABC): :param version: Helm chart version, defaults to "1.0.4" :param resetter_values: Overriding Kafka Connect Resetter Helm values. E.g. to override the Image Tag etc., defaults to dict + :param _connector_type: Defines the type of the connector (Source or Sink) """ namespace: str = Field( @@ -71,6 +72,8 @@ class KafkaConnector(PipelineComponent, ABC): description=describe_attr("resetter_values", __doc__), ) + _connector_type: KafkaConnectorType = Field(default=..., hidden_from_schema=True) + @validator("app", pre=True) def connector_config_should_have_component_name( cls, @@ -98,11 +101,15 @@ def helm(self) -> Helm: ) return helm - def _get_resetter_helm_chart(self) -> str: - """Get reseter Helm chart + @property + def _resetter_release_name(self) -> str: + suffix = "-clean" + clean_up_release_name = self.full_name + suffix + trimmed_name = trim_release_name(clean_up_release_name, suffix) + return trimmed_name - :return: returns the component resetter's helm chart - """ + @property + def _resetter_helm_chart(self) -> str: return f"{self.repo_config.repository_name}/kafka-connect-resetter" @cached_property @@ -110,11 +117,6 @@ def dry_run_handler(self) -> DryRunHandler: helm_diff = HelmDiff(self.config.helm_diff_config) return DryRunHandler(self.helm, helm_diff, self.namespace) - @property - def kafka_connect_resetter_chart(self) -> str: - """Resetter chart for this component""" - return f"{self.repo_config.repository_name}/kafka-connect-resetter" - @property def helm_flags(self) -> HelmFlags: """Return shared flags for Helm commands""" @@ -148,7 +150,9 @@ def deploy(self, dry_run: bool) -> None: @override def destroy(self, dry_run: bool) -> None: - self.handlers.connector_handler.destroy_connector(self.name, dry_run=dry_run) + self.handlers.connector_handler.destroy_connector( + self.full_name, dry_run=dry_run + ) @override def clean(self, dry_run: bool) -> None: @@ -161,8 +165,6 @@ def clean(self, dry_run: bool) -> None: def _run_connect_resetter( self, - connector_name: str, - connector_type: KafkaConnectorType, dry_run: bool, retain_clean_jobs: bool, **kwargs, @@ -173,69 +175,48 @@ def _run_connect_resetter( to make sure that there is no running clean job in the cluster. Then it releases a cleanup job. If the retain_clean_jobs flag is set to false the cleanup job will be deleted. - :param connector_name: Name of the connector - :param connector_type: Type of the connector (SINK or SOURCE) :param dry_run: If the cleanup should be run in dry run mode or not :param retain_clean_jobs: If the cleanup job should be kept :param kwargs: Other values for the KafkaConnectResetter """ - trimmed_name = self._get_kafka_resetter_release_name(connector_name) - log.info( magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {connector_name}" + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {self.full_name}" ) ) - self.__uninstall_connect_resetter(trimmed_name, dry_run) + self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) log.info( magentaify( - f"Connector Cleanup: deploy Connect {connector_type.value} resetter for {connector_name}" + f"Connector Cleanup: deploy Connect {self._connector_type.value} resetter for {self.full_name}" ) ) - stdout = self.__install_connect_resetter( - trimmed_name, connector_name, connector_type, dry_run, **kwargs - ) + stdout = self.__install_connect_resetter(dry_run, **kwargs) if dry_run: - self.dry_run_handler.print_helm_diff(stdout, trimmed_name, log) + self.dry_run_handler.print_helm_diff( + stdout, self._resetter_release_name, log + ) if not retain_clean_jobs: log.info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")) - self.__uninstall_connect_resetter(trimmed_name, dry_run) - - def _get_kafka_resetter_release_name(self, connector_name: str) -> str: - """Get connector resetter's release name - - :param connector_name: Name of the connector to be reset - :return: The name of the resetter to be used - """ - suffix = "-clean" - clean_up_release_name = connector_name + suffix - trimmed_name = trim_release_name(clean_up_release_name, suffix) - return trimmed_name + self.__uninstall_connect_resetter(self._resetter_release_name, dry_run) def __install_connect_resetter( self, - release_name: str, - connector_name: str, - connector_type: KafkaConnectorType, dry_run: bool, **kwargs, ) -> str: """Install connector resetter - :param release_name: Release name for the resetter - :param connector_name: Name of the connector-to-be-reset - :param connector_type: Type of the connector :param dry_run: Whether to dry run the command :return: The output of `helm upgrade --install` """ return self.helm.upgrade_install( - release_name=release_name, + release_name=self._resetter_release_name, namespace=self.namespace, - chart=self.kafka_connect_resetter_chart, + chart=self._resetter_helm_chart, dry_run=dry_run, flags=HelmUpgradeInstallFlags( create_namespace=self.config.create_namespace, @@ -244,33 +225,27 @@ def __install_connect_resetter( wait=True, ), values=self._get_kafka_connect_resetter_values( - connector_name, - connector_type, **kwargs, ), ) def _get_kafka_connect_resetter_values( self, - connector_name: str, - connector_type: KafkaConnectorType, **kwargs, ) -> dict: """Get connector resetter helm chart values - :param connector_name: Name of the connector - :param connector_type: Type of the connector :return: The Helm chart values of the connector resetter """ return { **KafkaConnectResetterValues( config=KafkaConnectResetterConfig( - connector=connector_name, + connector=self.full_name, brokers=self.config.brokers, **kwargs, ), - connector_type=connector_type.value, - name_override=connector_name, + connector_type=self._connector_type.value, + name_override=self.full_name, ).dict(), **self.resetter_values, } @@ -301,6 +276,8 @@ class KafkaSourceConnector(KafkaConnector): description=describe_attr("offset_topic", __doc__), ) + _connector_type = KafkaConnectorType.SOURCE + @override def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: raise NotImplementedError("Kafka source connector doesn't support FromSection") @@ -308,13 +285,11 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override def template(self) -> None: values = self._get_kafka_connect_resetter_values( - self.name, - KafkaConnectorType.SOURCE, offset_topic=self.offset_topic, ) stdout = self.helm.template( - self._get_kafka_resetter_release_name(self.name), - self._get_resetter_helm_chart(), + self._resetter_release_name, + self._resetter_helm_chart, self.namespace, values, self.template_flags, @@ -336,8 +311,6 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None: :param dry_run: Whether to do a dry run of the command """ self._run_connect_resetter( - connector_name=self.name, - connector_type=KafkaConnectorType.SOURCE, dry_run=dry_run, retain_clean_jobs=self.config.retain_clean_jobs, offset_topic=self.offset_topic, @@ -347,6 +320,8 @@ def __run_kafka_connect_resetter(self, dry_run: bool) -> None: class KafkaSinkConnector(KafkaConnector): """Kafka sink connector model""" + _connector_type = KafkaConnectorType.SINK + @override def add_input_topics(self, topics: list[str]) -> None: existing_topics: str | None = getattr(self.app, "topics", None) @@ -356,12 +331,10 @@ def add_input_topics(self, topics: list[str]) -> None: @override def template(self) -> None: - values = self._get_kafka_connect_resetter_values( - self.name, KafkaConnectorType.SINK - ) + values = self._get_kafka_connect_resetter_values() stdout = self.helm.template( - self._get_kafka_resetter_release_name(self.name), - self._get_resetter_helm_chart(), + self._resetter_release_name, + self._resetter_helm_chart, self.namespace, values, self.template_flags, @@ -391,10 +364,9 @@ def __run_kafka_connect_resetter( """Runs the connector resetter :param dry_run: Whether to do a dry run of the command + :param delete_consumer_group: Whether the consumer group should be deleted or not """ self._run_connect_resetter( - connector_name=self.name, - connector_type=KafkaConnectorType.SINK, dry_run=dry_run, retain_clean_jobs=self.config.retain_clean_jobs, delete_consumer_group=delete_consumer_group, diff --git a/tests/component_handlers/kafka_connect/test_connect_handler.py b/tests/component_handlers/kafka_connect/test_connect_handler.py index 907065c5e..a5a1f3246 100644 --- a/tests/component_handlers/kafka_connect/test_connect_handler.py +++ b/tests/component_handlers/kafka_connect/test_connect_handler.py @@ -250,6 +250,7 @@ def test_should_call_delete_connector_when_destroying_existing_connector_not_dry handler = self.connector_handler(connector_wrapper) handler.destroy_connector(CONNECTOR_NAME, dry_run=False) + assert connector_wrapper.mock_calls == [ mock.call.get_connector(CONNECTOR_NAME), mock.call.delete_connector(CONNECTOR_NAME), diff --git a/tests/components/test_kafka_connector.py b/tests/components/test_kafka_connector.py index 4e8424e5c..912f449fb 100644 --- a/tests/components/test_kafka_connector.py +++ b/tests/components/test_kafka_connector.py @@ -12,10 +12,8 @@ DEFAULTS_PATH = Path(__file__).parent / "resources" CONNECTOR_NAME = "test-connector-with-long-name-0123456789abcdefghijklmnop" -CONNECTOR_NAME_PREFIXED = ( - "${pipeline_name}-test-connector-with-long-name-0123456789abcdefghijklmnop" -) -CONNECTOR_CLEAN_NAME = "test-connector-with-long-name-0123456789abcdef-clean" +CONNECTOR_FULL_NAME = "${pipeline_name}-" + CONNECTOR_NAME +CONNECTOR_CLEAN_FULL_NAME = "${pipeline_name}-test-connector-with-long-name-clean" CONNECTOR_CLASS = "com.bakdata.connect.TestConnector" @@ -58,7 +56,7 @@ def connector_config(self) -> KafkaConnectorConfig: return KafkaConnectorConfig( **{ "connector.class": CONNECTOR_CLASS, - "name": CONNECTOR_NAME_PREFIXED, + "name": CONNECTOR_FULL_NAME, } ) @@ -75,7 +73,7 @@ def test_connector_config_name_override( app=connector_config, namespace="test-namespace", ) - assert connector.app.name == CONNECTOR_NAME_PREFIXED + assert connector.app.name == CONNECTOR_FULL_NAME connector = KafkaConnector( name=CONNECTOR_NAME, @@ -84,7 +82,7 @@ def test_connector_config_name_override( app={"connector.class": CONNECTOR_CLASS}, # type: ignore namespace="test-namespace", ) - assert connector.app.name == CONNECTOR_NAME_PREFIXED + assert connector.app.name == CONNECTOR_FULL_NAME with pytest.raises( ValueError, match="Connector name should be the same as component name" diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 5de354739..91760e90c 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -27,7 +27,8 @@ ) from kpops.utils.colorify import magentaify from tests.components.test_kafka_connector import ( - CONNECTOR_CLEAN_NAME, + CONNECTOR_CLEAN_FULL_NAME, + CONNECTOR_FULL_NAME, CONNECTOR_NAME, TestKafkaConnector, ) @@ -168,7 +169,9 @@ def test_destroy( connector.destroy(dry_run=True) - mock_destroy_connector.assert_called_once_with(CONNECTOR_NAME, dry_run=True) + mock_destroy_connector.assert_called_once_with( + CONNECTOR_FULL_NAME, dry_run=True + ) def test_reset_when_dry_run_is_true( self, @@ -208,11 +211,11 @@ def test_reset_when_dry_run_is_false( ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=dry_run, ), mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, namespace="test-namespace", chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", dry_run=dry_run, @@ -225,15 +228,15 @@ def test_reset_when_dry_run_is_false( "connectorType": "sink", "config": { "brokers": "broker:9092", - "connector": CONNECTOR_NAME, + "connector": CONNECTOR_FULL_NAME, "deleteConsumerGroup": False, }, - "nameOverride": CONNECTOR_NAME, + "nameOverride": CONNECTOR_FULL_NAME, }, ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=dry_run, ), ] @@ -259,23 +262,7 @@ def test_clean_when_dry_run_is_false( log_info_mock: MagicMock, dry_run_handler: MagicMock, mocker: MockerFixture, - connector_config: KafkaConnectorConfig, ): - connector = KafkaSinkConnector( - name=CONNECTOR_NAME, - config=config, - handlers=handlers, - app=connector_config, - namespace="test-namespace", - to=ToSection( - topics={ - TopicName("${output_topic_name}"): TopicConfig( - type=OutputTopicTypes.OUTPUT, partitions_count=10 - ), - } - ), - ) - mock_delete_topics = mocker.patch.object( connector.handlers.topic_handler, "delete_topics" ) @@ -294,12 +281,12 @@ def test_clean_when_dry_run_is_false( assert log_info_mock.mock_calls == [ call.log_info( magentaify( - f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_NAME}" + f"Connector Cleanup: uninstalling cleanup job Helm release from previous runs for {CONNECTOR_FULL_NAME}" ) ), call.log_info( magentaify( - f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_NAME}" + f"Connector Cleanup: deploy Connect {KafkaConnectorType.SINK.value} resetter for {CONNECTOR_FULL_NAME}" ) ), call.log_info(magentaify("Connector Cleanup: uninstall Kafka Resetter.")), @@ -314,11 +301,11 @@ def test_clean_when_dry_run_is_false( ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=dry_run, ), mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, namespace="test-namespace", chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", dry_run=dry_run, @@ -331,15 +318,15 @@ def test_clean_when_dry_run_is_false( "connectorType": "sink", "config": { "brokers": "broker:9092", - "connector": CONNECTOR_NAME, + "connector": CONNECTOR_FULL_NAME, "deleteConsumerGroup": True, }, - "nameOverride": CONNECTOR_NAME, + "nameOverride": CONNECTOR_FULL_NAME, }, ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=dry_run, ), ] @@ -408,11 +395,11 @@ def test_clean_without_to_when_dry_run_is_false( ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=dry_run, ), mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, namespace="test-namespace", chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", dry_run=dry_run, @@ -425,15 +412,15 @@ def test_clean_without_to_when_dry_run_is_false( "connectorType": "sink", "config": { "brokers": "broker:9092", - "connector": CONNECTOR_NAME, + "connector": CONNECTOR_FULL_NAME, "deleteConsumerGroup": True, }, - "nameOverride": CONNECTOR_NAME, + "nameOverride": CONNECTOR_FULL_NAME, }, ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=dry_run, ), ] diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 72c487e74..db9a2dd77 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -24,7 +24,8 @@ ) from kpops.utils.environment import ENV from tests.components.test_kafka_connector import ( - CONNECTOR_CLEAN_NAME, + CONNECTOR_CLEAN_FULL_NAME, + CONNECTOR_FULL_NAME, CONNECTOR_NAME, TestKafkaConnector, ) @@ -112,7 +113,9 @@ def test_destroy( connector.destroy(dry_run=True) - mock_destroy_connector.assert_called_once_with(CONNECTOR_NAME, dry_run=True) + mock_destroy_connector.assert_called_once_with( + CONNECTOR_FULL_NAME, dry_run=True + ) def test_reset_when_dry_run_is_true( self, @@ -154,11 +157,11 @@ def test_reset_when_dry_run_is_false( ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=False, ), mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, namespace="test-namespace", chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", dry_run=False, @@ -171,15 +174,15 @@ def test_reset_when_dry_run_is_false( "connectorType": "source", "config": { "brokers": "broker:9092", - "connector": CONNECTOR_NAME, + "connector": CONNECTOR_FULL_NAME, "offsetTopic": "kafka-connect-offsets", }, - "nameOverride": CONNECTOR_NAME, + "nameOverride": CONNECTOR_FULL_NAME, }, ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name="test-connector-with-long-name-0123456789abcdef-clean", + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=False, ), ] @@ -229,11 +232,11 @@ def test_clean_when_dry_run_is_false( ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=False, ), mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, namespace="test-namespace", chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", dry_run=False, @@ -246,15 +249,15 @@ def test_clean_when_dry_run_is_false( "connectorType": "source", "config": { "brokers": "broker:9092", - "connector": CONNECTOR_NAME, + "connector": CONNECTOR_FULL_NAME, "offsetTopic": "kafka-connect-offsets", }, - "nameOverride": CONNECTOR_NAME, + "nameOverride": CONNECTOR_FULL_NAME, }, ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=False, ), ] @@ -304,11 +307,11 @@ def test_clean_without_to_when_dry_run_is_false( ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=False, ), mocker.call.helm.upgrade_install( - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, namespace="test-namespace", chart="bakdata-kafka-connect-resetter/kafka-connect-resetter", dry_run=False, @@ -321,15 +324,15 @@ def test_clean_without_to_when_dry_run_is_false( "connectorType": "source", "config": { "brokers": "broker:9092", - "connector": CONNECTOR_NAME, + "connector": CONNECTOR_FULL_NAME, "offsetTopic": "kafka-connect-offsets", }, - "nameOverride": CONNECTOR_NAME, + "nameOverride": CONNECTOR_FULL_NAME, }, ), mocker.call.helm.uninstall( namespace="test-namespace", - release_name=CONNECTOR_CLEAN_NAME, + release_name=CONNECTOR_CLEAN_FULL_NAME, dry_run=False, ), ]