From d083d9a70652a9b9a00d41609cdb714e17450a5a Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Thu, 27 Jun 2024 09:39:28 +0200 Subject: [PATCH] Call destroy from inside of reset or clean --- kpops/api/__init__.py | 2 - .../base_components/kafka_connector.py | 3 + .../base_components/pipeline_component.py | 2 + .../producer/producer_app.py | 1 + .../streams_bootstrap/streams/streams_app.py | 2 + tests/components/test_kafka_sink_connector.py | 11 +++ .../components/test_kafka_source_connector.py | 22 ++++- tests/components/test_producer_app.py | 32 ++++++- tests/components/test_streams_app.py | 83 ++++++++++++++++--- 9 files changed, 139 insertions(+), 19 deletions(-) diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 826307308..5c590aefd 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -180,7 +180,6 @@ def reset( ) async def reset_runner(component: PipelineComponent): - await component.destroy(dry_run) log_action("Reset", component) await component.reset(dry_run) @@ -217,7 +216,6 @@ def clean( ) async def clean_runner(component: PipelineComponent): - await component.destroy(dry_run) log_action("Clean", component) await component.clean(dry_run) diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 1d9a296b3..300d7ebcb 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -187,6 +187,7 @@ async def destroy(self, dry_run: bool) -> None: @override async def clean(self, dry_run: bool) -> None: + await super().clean(dry_run) if self.to: if self.handlers.schema_handler: await self.handlers.schema_handler.delete_schemas( @@ -229,6 +230,7 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override async def reset(self, dry_run: bool) -> None: + await super().reset(dry_run) await self._resetter.reset(dry_run) @override @@ -266,6 +268,7 @@ def set_error_topic(self, topic: KafkaTopic) -> None: @override async def reset(self, dry_run: bool) -> None: + await super().reset(dry_run) self._resetter.app.config.delete_consumer_group = False await self._resetter.reset(dry_run) diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index 7ce114899..ec49bce01 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None: :param dry_run: Whether to do a dry run of the command """ + await self.destroy(dry_run) async def clean(self, dry_run: bool) -> None: """Destroy component including related states. :param dry_run: Whether to do a dry run of the command """ + await self.destroy(dry_run) diff --git a/kpops/components/streams_bootstrap/producer/producer_app.py b/kpops/components/streams_bootstrap/producer/producer_app.py index a9c06f37e..9464a2b2c 100644 --- a/kpops/components/streams_bootstrap/producer/producer_app.py +++ b/kpops/components/streams_bootstrap/producer/producer_app.py @@ -95,4 +95,5 @@ def helm_chart(self) -> str: @override async def clean(self, dry_run: bool) -> None: + await super().clean(dry_run) await self._cleaner.clean(dry_run) diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 9bd5d87c5..0a831f430 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -123,10 +123,12 @@ def helm_chart(self) -> str: @override async def reset(self, dry_run: bool) -> None: + await super().reset(dry_run) self._cleaner.app.streams.delete_output = False await self._cleaner.clean(dry_run) @override async def clean(self, dry_run: bool) -> None: + await super().clean(dry_run) self._cleaner.app.streams.delete_output = True await self._cleaner.clean(dry_run) diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index 51b30a61d..0b73065d7 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -251,6 +251,7 @@ async def test_reset_when_dry_run_is_false( helm_mock: MagicMock, mocker: MockerFixture, ): + mock_destroy = mocker.patch.object(connector, "destroy") mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -260,6 +261,7 @@ async def test_reset_when_dry_run_is_false( mock_resetter_reset = mocker.spy(connector._resetter, "reset") mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -270,6 +272,7 @@ async def test_reset_when_dry_run_is_false( mock.assert_has_calls( [ + mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", @@ -335,6 +338,8 @@ async def test_clean_when_dry_run_is_false( dry_run_handler_mock: MagicMock, mocker: MockerFixture, ): + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -343,6 +348,7 @@ async def test_clean_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -366,6 +372,7 @@ async def test_clean_when_dry_run_is_false( assert connector.to assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), *( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics @@ -451,6 +458,8 @@ async def test_clean_without_to_when_dry_run_is_false( resetter_namespace=RESETTER_NAMESPACE, ) + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -458,6 +467,7 @@ async def test_clean_without_to_when_dry_run_is_false( connector.handlers.connector_handler, "clean_connector" ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -466,6 +476,7 @@ async def test_clean_without_to_when_dry_run_is_false( await connector.clean(dry_run) assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index 46aea7645..14d7935cb 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false( mocker: MockerFixture, ): assert connector.handlers.connector_handler + + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -174,12 +177,15 @@ async def test_reset_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") - await connector.reset(dry_run=False) + dry_run = False + await connector.reset(dry_run) assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false( mocker.call.helm.uninstall( RESETTER_NAMESPACE, CONNECTOR_CLEAN_RELEASE_NAME, - False, + dry_run, ), ANY, # __bool__ ANY, # __str__ mocker.call.helm.upgrade_install( CONNECTOR_CLEAN_RELEASE_NAME, "bakdata-kafka-connect-resetter/kafka-connect-resetter", - False, + dry_run, RESETTER_NAMESPACE, { "connectorType": CONNECTOR_TYPE, @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false( mocker.call.helm.uninstall( RESETTER_NAMESPACE, CONNECTOR_CLEAN_RELEASE_NAME, - False, + dry_run, ), ANY, # __bool__ ANY, # __str__ @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false( ): assert connector.handlers.connector_handler + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false( assert connector.to assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), *( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false( assert connector.handlers.connector_handler + mock_destroy = mocker.patch.object(connector, "destroy") + mock_delete_topic = mocker.patch.object( connector.handlers.topic_handler, "delete_topic" ) @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false( ) mock = mocker.MagicMock() + mock.attach_mock(mock_destroy, "destroy_connector") mock.attach_mock(mock_delete_topic, "mock_delete_topic") mock.attach_mock(mock_clean_connector, "mock_clean_connector") mock.attach_mock(helm_mock, "helm") @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false( await connector.clean(dry_run) assert mock.mock_calls == [ + mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", diff --git a/tests/components/test_producer_app.py b/tests/components/test_producer_app.py index 4f7184ead..16272e95f 100644 --- a/tests/components/test_producer_app.py +++ b/tests/components/test_producer_app.py @@ -198,11 +198,17 @@ async def test_destroy( ) @pytest.mark.asyncio() - async def test_should_not_reset_producer_app( + async def test_should_clean_producer_app( self, producer_app: ProducerApp, mocker: MockerFixture, ): + # actual component + mock_helm_uninstall_producer_app = mocker.patch.object( + producer_app.helm, "uninstall" + ) + + # cleaner mock_helm_upgrade_install = mocker.patch.object( producer_app._cleaner.helm, "upgrade_install" ) @@ -214,14 +220,22 @@ async def test_should_not_reset_producer_app( ) mock = mocker.MagicMock() - mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") + mock.attach_mock( + mock_helm_uninstall_producer_app, "helm_uninstall_producer_app" + ) mock.attach_mock(mock_helm_uninstall, "helm_uninstall") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_print_helm_diff, "print_helm_diff") await producer_app.clean(dry_run=True) mock.assert_has_calls( [ + mocker.call.helm_uninstall_producer_app( + "test-namespace", PRODUCER_APP_RELEASE_NAME, True + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", PRODUCER_APP_CLEAN_RELEASE_NAME, @@ -264,6 +278,12 @@ async def test_should_not_reset_producer_app( async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false( self, mocker: MockerFixture, producer_app: ProducerApp ): + # actual component + mock_helm_uninstall_producer_app = mocker.patch.object( + producer_app.helm, "uninstall" + ) + + # cleaner mock_helm_upgrade_install = mocker.patch.object( producer_app._cleaner.helm, "upgrade_install" ) @@ -272,6 +292,9 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea ) mock = mocker.MagicMock() + mock.attach_mock( + mock_helm_uninstall_producer_app, "helm_uninstall_producer_app" + ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -279,6 +302,11 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea mock.assert_has_calls( [ + mocker.call.helm_uninstall_producer_app( + "test-namespace", PRODUCER_APP_RELEASE_NAME, False + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", PRODUCER_APP_CLEAN_RELEASE_NAME, diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index cb340174a..7b6a8ca5a 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -77,7 +77,9 @@ def config(self) -> KpopsConfig: @pytest.fixture() def streams_app( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ) -> StreamsApp: return StreamsApp( name=STREAMS_APP_NAME, @@ -100,7 +102,9 @@ def streams_app( @pytest.fixture() def stateful_streams_app( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ) -> StreamsApp: return StreamsApp( name=STREAMS_APP_NAME, @@ -196,7 +200,11 @@ def test_cleaner_helm_name_override(self, streams_app: StreamsApp): == STREAMS_APP_CLEAN_HELM_NAME_OVERRIDE ) - def test_set_topics(self, config: KpopsConfig, handlers: ComponentHandlers): + def test_set_topics( + self, + config: KpopsConfig, + handlers: ComponentHandlers, + ): streams_app = StreamsApp( name=STREAMS_APP_NAME, config=config, @@ -245,7 +253,9 @@ def test_set_topics(self, config: KpopsConfig, handlers: ComponentHandlers): assert "extraInputPatterns" in streams_config def test_no_empty_input_topic( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name=STREAMS_APP_NAME, @@ -275,7 +285,11 @@ def test_no_empty_input_topic( assert "inputPattern" in streams_config assert "extraInputPatterns" not in streams_config - def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers): + def test_should_validate( + self, + config: KpopsConfig, + handlers: ComponentHandlers, + ): # An exception should be raised when both role and type are defined and type is input with pytest.raises( ValueError, match="Define role only if `type` is `pattern` or `None`" @@ -325,7 +339,9 @@ def test_should_validate(self, config: KpopsConfig, handlers: ComponentHandlers) ) def test_set_streams_output_from_to( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name=STREAMS_APP_NAME, @@ -368,7 +384,9 @@ def test_set_streams_output_from_to( ) def test_weave_inputs_from_prev_component( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name=STREAMS_APP_NAME, @@ -452,7 +470,7 @@ async def test_deploy_order_when_dry_run_is_false( mock = mocker.AsyncMock() mock.attach_mock(mock_create_topic, "mock_create_topic") - mock.attach_mock(mock_helm_upgrade_install, "mock_helm_upgrade_install") + mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") dry_run = False await streams_app.deploy(dry_run=dry_run) @@ -493,7 +511,7 @@ async def test_deploy_order_when_dry_run_is_false( mocker.call.mock_create_topic(topic, dry_run=dry_run) for topic in streams_app.to.kafka_topics ), - mocker.call.mock_helm_upgrade_install( + mocker.call.helm_upgrade_install( STREAMS_APP_RELEASE_NAME, "bakdata-streams-bootstrap/streams-app", dry_run, @@ -526,7 +544,11 @@ async def test_deploy_order_when_dry_run_is_false( ] @pytest.mark.asyncio() - async def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): + async def test_destroy( + self, + streams_app: StreamsApp, + mocker: MockerFixture, + ): mock_helm_uninstall = mocker.patch.object(streams_app.helm, "uninstall") await streams_app.destroy(dry_run=True) @@ -539,6 +561,11 @@ async def test_destroy(self, streams_app: StreamsApp, mocker: MockerFixture): async def test_reset_when_dry_run_is_false( self, streams_app: StreamsApp, mocker: MockerFixture ): + # actual component + mock_helm_uninstall_streams_app = mocker.patch.object( + streams_app.helm, "uninstall" + ) + cleaner = streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) @@ -546,6 +573,9 @@ async def test_reset_when_dry_run_is_false( mock_helm_uninstall = mocker.patch.object(cleaner.helm, "uninstall") mock = mocker.MagicMock() + mock.attach_mock( + mock_helm_uninstall_streams_app, "mock_helm_uninstall_streams_app" + ) mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -554,6 +584,11 @@ async def test_reset_when_dry_run_is_false( mock.assert_has_calls( [ + mocker.call.mock_helm_uninstall_streams_app( + "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -592,6 +627,11 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean streams_app: StreamsApp, mocker: MockerFixture, ): + # actual component + mock_helm_uninstall_streams_app = mocker.patch.object( + streams_app.helm, "uninstall" + ) + mock_helm_upgrade_install = mocker.patch.object( streams_app._cleaner.helm, "upgrade_install" ) @@ -600,6 +640,7 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean ) mock = mocker.MagicMock() + mock.attach_mock(mock_helm_uninstall_streams_app, "helm_uninstall_streams_app") mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") @@ -608,6 +649,11 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean mock.assert_has_calls( [ + mocker.call.helm_uninstall_streams_app( + "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -642,7 +688,9 @@ async def test_should_clean_streams_app_and_deploy_clean_up_job_and_delete_clean @pytest.mark.asyncio() async def test_get_input_output_topics( - self, config: KpopsConfig, handlers: ComponentHandlers + self, + config: KpopsConfig, + handlers: ComponentHandlers, ): streams_app = StreamsApp( name="my-app", @@ -720,6 +768,10 @@ def test_raise_validation_error_when_persistence_enabled_and_size_not_set( async def test_stateful_clean_with_dry_run_false( self, stateful_streams_app: StreamsApp, mocker: MockerFixture ): + # actual component + mock_helm_uninstall_streams_app = mocker.patch.object( + stateful_streams_app.helm, "uninstall" + ) cleaner = stateful_streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner) @@ -736,6 +788,7 @@ async def test_stateful_clean_with_dry_run_false( ) mock = MagicMock() + mock.attach_mock(mock_helm_uninstall_streams_app, "helm_uninstall_streams_app") mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") mock.attach_mock(mock_helm_uninstall, "helm_uninstall") mock.attach_mock(mock_delete_pvcs, "delete_pvcs") @@ -745,6 +798,11 @@ async def test_stateful_clean_with_dry_run_false( mock.assert_has_calls( [ + mocker.call.helm_uninstall_streams_app( + "test-namespace", STREAMS_APP_RELEASE_NAME, dry_run + ), + ANY, # __bool__ + ANY, # __str__ mocker.call.helm_uninstall( "test-namespace", STREAMS_APP_CLEAN_RELEASE_NAME, @@ -790,6 +848,9 @@ async def test_stateful_clean_with_dry_run_true( caplog: pytest.LogCaptureFixture, ): caplog.set_level(logging.INFO) + # actual component + mocker.patch.object(stateful_streams_app, "destroy") + cleaner = stateful_streams_app._cleaner assert isinstance(cleaner, StreamsAppCleaner)