diff --git a/kpops/components/base_components/kafka_connector.py b/kpops/components/base_components/kafka_connector.py index 679251805..1f46d7681 100644 --- a/kpops/components/base_components/kafka_connector.py +++ b/kpops/components/base_components/kafka_connector.py @@ -233,12 +233,12 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn: @override async def reset(self, dry_run: bool) -> None: - """Reset the state of a Kafka Connect source connector. The source connector is kept.""" + """Reset state. Keep connector.""" await self._resetter.reset(dry_run) @override async def clean(self, dry_run: bool) -> None: - """Delete the source connector. Reset the state of a Kafka Connect source connector.""" + """Destroy and reset state.""" await super().clean(dry_run) await self._resetter.clean(dry_run) @@ -272,13 +272,13 @@ def set_error_topic(self, topic: KafkaTopic) -> None: @override async def reset(self, dry_run: bool) -> None: - """Reset the consumer group offsets by deploying the sink resetter. The sink connector is kept.""" + """Reset state. Keep consumer group and connector.""" self._resetter.app.config.delete_consumer_group = False await self._resetter.reset(dry_run) @override async def clean(self, dry_run: bool) -> None: - """Delete sink connector. Delete the consumer group offsets by deploying the sink resetter.""" + """Delete connector and consumer group.""" await super().clean(dry_run) self._resetter.app.config.delete_consumer_group = True await self._resetter.clean(dry_run) diff --git a/kpops/components/base_components/pipeline_component.py b/kpops/components/base_components/pipeline_component.py index ec49bce01..7ce114899 100644 --- a/kpops/components/base_components/pipeline_component.py +++ b/kpops/components/base_components/pipeline_component.py @@ -250,11 +250,9 @@ async def reset(self, dry_run: bool) -> None: :param dry_run: Whether to do a dry run of the command """ - await self.destroy(dry_run) async def clean(self, dry_run: bool) -> None: """Destroy component including related states. :param dry_run: Whether to do a dry run of the command """ - await self.destroy(dry_run) diff --git a/kpops/components/streams_bootstrap/__init__.py b/kpops/components/streams_bootstrap/__init__.py index 1b02b091b..f007d12c8 100644 --- a/kpops/components/streams_bootstrap/__init__.py +++ b/kpops/components/streams_bootstrap/__init__.py @@ -29,3 +29,11 @@ class StreamsBootstrap(HelmApp, ABC): default=STREAMS_BOOTSTRAP_VERSION, description=describe_attr("version", __doc__), ) + + async def clean(self, dry_run: bool) -> None: + await self.destroy(dry_run) + await super().clean(dry_run) + + async def reset(self, dry_run: bool) -> None: + await self.destroy(dry_run) + await super().reset(dry_run) diff --git a/tests/components/test_kafka_sink_connector.py b/tests/components/test_kafka_sink_connector.py index f307c2d0b..f42d37793 100644 --- a/tests/components/test_kafka_sink_connector.py +++ b/tests/components/test_kafka_sink_connector.py @@ -355,6 +355,7 @@ async def test_clean_when_dry_run_is_false( dry_run = False await connector.clean(dry_run=dry_run) + mock_destroy.assert_not_called() assert log_info_mock.mock_calls == [ call.log_info( @@ -372,7 +373,6 @@ async def test_clean_when_dry_run_is_false( assert connector.to assert mock.mock_calls == [ - mocker.call.destroy_connector(dry_run), *( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics @@ -474,9 +474,9 @@ async def test_clean_without_to_when_dry_run_is_false( dry_run = False await connector.clean(dry_run) + mock_destroy.assert_not_called() assert mock.mock_calls == [ - mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/", diff --git a/tests/components/test_kafka_source_connector.py b/tests/components/test_kafka_source_connector.py index d6127e9fc..e51195ae4 100644 --- a/tests/components/test_kafka_source_connector.py +++ b/tests/components/test_kafka_source_connector.py @@ -268,10 +268,10 @@ async def test_clean_when_dry_run_is_false( dry_run = False await connector.clean(dry_run) + mock_destroy.assert_not_called() assert connector.to assert mock.mock_calls == [ - mocker.call.destroy_connector(dry_run), *( mocker.call.mock_delete_topic(topic, dry_run=dry_run) for topic in connector.to.kafka_topics @@ -358,9 +358,9 @@ async def test_clean_without_to_when_dry_run_is_false( dry_run = False await connector.clean(dry_run) + mock_destroy.assert_not_called() assert mock.mock_calls == [ - mocker.call.destroy_connector(dry_run), mocker.call.helm.add_repo( "bakdata-kafka-connect-resetter", "https://bakdata.github.io/kafka-connect-resetter/",