diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index b65319a35..826307308 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -217,6 +217,7 @@ def clean( ) async def clean_runner(component: PipelineComponent): + await component.destroy(dry_run) log_action("Clean", component) await component.clean(dry_run) diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 300d7ebcb..1d9a296b3 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -187,7 +187,6 @@ async def destroy(self, dry_run: bool) -> None: @override async def clean(self, dry_run: bool) -> None: - await super().clean(dry_run) if self.to: if self.handlers.schema_handler: await self.handlers.schema_handler.delete_schemas( @@ -230,7 +229,6 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override async def reset(self, dry_run: bool) -> None: - await super().reset(dry_run) await self._resetter.reset(dry_run) @override @@ -268,7 +266,6 @@ def set_error_topic(self, topic: KafkaTopic) -> None: @override async def reset(self, dry_run: bool) -> None: - await super().reset(dry_run) self._resetter.app.config.delete_consumer_group = False await self._resetter.reset(dry_run) diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index ec49bce01..7ce114899 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -250,11 +250,9 @@ async def reset(self, dry_run: bool) -> None: :param dry_run: Whether to do a dry run of the command """ - await self.destroy(dry_run) async def clean(self, dry_run: bool) -> None: """Destroy component including related states. :param dry_run: Whether to do a dry run of the command """ - await self.destroy(dry_run) diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index 9464a2b2c..a9c06f37e 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -95,5 +95,4 @@ def helm_chart(self) -> str: @override async def clean(self, dry_run: bool) -> None: - await super().clean(dry_run) await self._cleaner.clean(dry_run) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index ddaf437fd..2ac396d0d 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -129,12 +129,10 @@ def helm_chart(self) -> str: @override async def reset(self, dry_run: bool) -> None: - await super().reset(dry_run) self._cleaner.app.streams.delete_output = False await self._cleaner.clean(dry_run) @override async def clean(self, dry_run: bool) -> None: - await super().reset(dry_run) self._cleaner.app.streams.delete_output = True await self._cleaner.clean(dry_run) diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 16272e95f..a855aba55 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -203,11 +203,6 @@ async def test_should_clean_producer_app( producer_app: ProducerApp, mocker: MockerFixture, ): - # actual component - mock_helm_uninstall_producer_app = mocker.patch.object( - producer_app.helm, "uninstall" - ) - # cleaner mock_helm_upgrade_install = mocker.patch.object( producer_app._cleaner.helm, "upgrade_install" @@ -220,9 +215,6 @@ async def test_should_clean_producer_app( ) mock = mocker.MagicMock() - mock.attach_mock( - mock_helm_uninstall_producer_app, "helm_uninstall_producer_app" - ) mock.attach_mock(mock_helm_uninstall, "helm_uninstall") mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_print_helm_diff, "print_helm_diff") @@ -231,11 +223,6 @@ async def test_should_clean_producer_app( mock.assert_has_calls( [ - mocker.call.helm_uninstall_producer_app( - "test-namespace", PRODUCER_APP_RELEASE_NAME, True - ), - ANY, # __bool__ - ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", PRODUCER_APP_CLEAN_RELEASE_NAME, @@ -278,12 +265,6 @@ async def test_should_clean_producer_app( async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false( self, mocker: MockerFixture, producer_app: ProducerApp ): - # actual component - mock_helm_uninstall_producer_app = mocker.patch.object( - producer_app.helm, "uninstall" - ) - - # cleaner mock_helm_upgrade_install = mocker.patch.object( producer_app._cleaner.helm, "upgrade_install" ) @@ -292,9 +273,6 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea ) mock = mocker.MagicMock() - mock.attach_mock( - mock_helm_uninstall_producer_app, "helm_uninstall_producer_app" - ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -302,11 +280,6 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea mock.assert_has_calls( [ - mocker.call.helm_uninstall_producer_app( - "test-namespace", PRODUCER_APP_RELEASE_NAME, False - ), - ANY, # __bool__ - ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", PRODUCER_APP_CLEAN_RELEASE_NAME, diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index 5d699f3b7..330428396 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -574,11 +574,6 @@ async def test_destroy( async def test_reset_when_dry_run_is_false( self, streams_app: StreamsApp, mocker: MockerFixture ): - # actual component - mock_helm_uninstall_streams_app = mocker.patch.object( - streams_app.helm, "uninstall" - ) - cleaner = streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) @@ -586,9 +581,6 @@ async def test_reset_when_dry_run_is_false( mock_helm_uninstall = mocker.patch.object(cleaner.helm, "uninstall") mock = mocker.MagicMock() - mock.attach_mock( - mock_helm_uninstall_streams_app, "mock_helm_uninstall_streams_app" - ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -597,11 +589,6 @@ async def test_reset_when_dry_run_is_false( mock.assert_has_calls( [ - mocker.call.mock_helm_uninstall_streams_app( - "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run - ), - ANY, # __bool__ - ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -640,11 +627,6 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean streams_app: StreamsApp, mocker: MockerFixture, ): - # actual component - mock_helm_uninstall_streams_app = mocker.patch.object( - streams_app.helm, "uninstall" - ) - mock_helm_upgrade_install = mocker.patch.object( streams_app._cleaner.helm, "upgrade_install" ) @@ -653,9 +635,6 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean ) mock = mocker.MagicMock() - mock.attach_mock( - mock_helm_uninstall_streams_app, "mock_helm_uninstall_streams_app" - ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -664,11 +643,6 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean mock.assert_has_calls( [ - mocker.call.mock_helm_uninstall_streams_app( - "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run - ), - ANY, # __bool__ - ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -702,18 +676,19 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean ) @pytest.mark.asyncio() - async def test_should_clean_streams_app_with_old_image_tag( + async def test_should_deploy_clean_up_job_with_image_tag_in_cluster( self, config: KpopsConfig, handlers: ComponentHandlers, mocker: MockerFixture, ): + image_tag_in_cluster = "1.1.1" mocker.patch.object( HelmApp, "helm_values", return_value={ "image": "registry/streams-app", - "imageTag": "1.1.1", + "imageTag": image_tag_in_cluster, "nameOverride": STREAMS_APP_NAME, "replicaCount": 1, "streams": { @@ -746,67 +721,33 @@ async def test_should_clean_streams_app_with_old_image_tag( }, ) - # actual component - mock_helm_uninstall_streams_app = mocker.patch.object( - streams_app.helm, "uninstall" - ) - mock_helm_upgrade_install = mocker.patch.object( streams_app._cleaner.helm, "upgrade_install" ) - mock_helm_uninstall = mocker.patch.object( - streams_app._cleaner.helm, "uninstall" - ) + mocker.patch.object(streams_app._cleaner.helm, "uninstall") mock = mocker.MagicMock() - mock.attach_mock( - mock_helm_uninstall_streams_app, "mock_helm_uninstall_streams_app" - ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") - mock.attach_mock(mock_helm_uninstall, "helm_uninstall") dry_run = False await streams_app.clean(dry_run=dry_run) - mock.assert_has_calls( - [ - mocker.call.mock_helm_uninstall_streams_app( - "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm_uninstall( - "test-namespace", - STREAMS_APP_CLEAN_RELEASE_NAME, - dry_run, - ), - ANY, # __bool__ - ANY, # __str__ - mocker.call.helm_upgrade_install( - STREAMS_APP_CLEAN_RELEASE_NAME, - "bakdata-streams-bootstrap/streams-app-cleanup-job", - dry_run, - "test-namespace", - { - "nameOverride": STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE, - "imageTag": "1.1.1", - "streams": { - "brokers": "fake-broker:9092", - "inputTopics": ["test-input-topic"], - "outputTopic": "streams-app-output-topic", - "deleteOutput": True, - }, - }, - HelmUpgradeInstallFlags( - version="2.9.0", wait=True, wait_for_jobs=True - ), - ), - mocker.call.helm_uninstall( - "test-namespace", - STREAMS_APP_CLEAN_RELEASE_NAME, - dry_run, - ), - ] + mock_helm_upgrade_install.assert_called_once_with( + STREAMS_APP_CLEAN_RELEASE_NAME, + "bakdata-streams-bootstrap/streams-app-cleanup-job", + dry_run, + "test-namespace", + { + "nameOverride": STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE, + "imageTag": image_tag_in_cluster, + "streams": { + "brokers": "fake-broker:9092", + "inputTopics": ["test-input-topic"], + "outputTopic": "streams-app-output-topic", + "deleteOutput": True, + }, + }, + HelmUpgradeInstallFlags(version="2.9.0", wait=True, wait_for_jobs=True), ) @pytest.mark.asyncio() @@ -892,10 +833,6 @@ def test_raise_validation_error_when_persistence_enabled_and_size_not_set( async def test_stateful_clean_with_dry_run_false( self, stateful_streams_app: StreamsApp, mocker: MockerFixture ): - # actual component - mock_helm_uninstall_streams_app = mocker.patch.object( - stateful_streams_app.helm, "uninstall" - ) cleaner = stateful_streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) @@ -912,9 +849,6 @@ async def test_stateful_clean_with_dry_run_false( ) mock = MagicMock() - mock.attach_mock( - mock_helm_uninstall_streams_app, "mock_helm_uninstall_streams_app" - ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") mock.attach_mock(mock_delete_pvcs, "delete_pvcs") @@ -924,11 +858,6 @@ async def test_stateful_clean_with_dry_run_false( mock.assert_has_calls( [ - mocker.call.mock_helm_uninstall_streams_app( - "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run - ), - ANY, # __bool__ - ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME,