Skip to content

Commit

Permalink
Call destroy from inside of reset or clean (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf authored Jul 15, 2024
1 parent 8f4a997 commit bcef0e5
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 185 deletions.
37 changes: 37 additions & 0 deletions docs/docs/user/migration-guide/v6-v7.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,40 @@ As a result of the restructure, some imports need to be adjusted:
-components/__init__.py
+kpops/components/custom/__init__.py
```

## [Call destroy from inside of reset or clean](https://github.com/bakdata/kpops/pull/501)

Before v7, the KPOps CLI executed `destroy` before running `reset/clean` to ensure the component was destroyed.

This logic has changed. The `destroy` method is now called within the `PipelineComponent`'s `reset`/`clean`.

During migrating to v7, you should check your custom components and see if they override the `reset`/`clean` methods. If so, you need to call the supermethod `reset`/`clean` to trigger the `destroy` inside the parent class. Alternatively, if you are implementing the `PipelineComponent` class, you need to call the `destroy` method at the beginning of the method.

#### components.py

For example, when creating a custom `StreamsApp` or `ProducerApp` (or any other custom component), you **must** call the supermethod `reset`/`clean` to execute the `destroy` in the parent class. **Otherwise, the logic of destroy will not be executed!**

````diff
class MyStreamsApp(StreamsApp):

@override
async def clean(self, dry_run: bool) -> None:
+ await super().clean(dry_run)
# Some custom clean logic
# ...
```diff


class MyCustomComponent(PipelineComponent):

@override
async def destroy(self, dry_run: bool) -> None:
# Some custom destroy logic
# ...

@override
async def clean(self, dry_run: bool) -> None:
+ await super().clean(dry_run)
# Some custom clean logic
# ...
````
2 changes: 0 additions & 2 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -286,7 +285,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def _resetter(self) -> KafkaConnectorResetter:

@override
async def deploy(self, dry_run: bool) -> None:
"""Deploy Kafka Connector (Source/Sink). Create output topics and register schemas if configured."""
if self.to:
for topic in self.to.kafka_topics:
await self.handlers.topic_handler.create_topic(topic, dry_run=dry_run)
Expand All @@ -181,12 +182,15 @@ async def deploy(self, dry_run: bool) -> None:

@override
async def destroy(self, dry_run: bool) -> None:
"""Delete Kafka Connector (Source/Sink) from the Kafka connect cluster."""
await self.handlers.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete Kafka Connector. If schema handler is enabled, then remove schemas. Delete all the output topics."""
await super().clean(dry_run)
if self.to:
if self.handlers.schema_handler:
await self.handlers.schema_handler.delete_schemas(
Expand Down Expand Up @@ -229,10 +233,12 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset state. Keep connector."""
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete connector and reset state."""
await super().clean(dry_run)
await self._resetter.clean(dry_run)

Expand Down Expand Up @@ -266,11 +272,13 @@ def set_error_topic(self, topic: KafkaTopic) -> None:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset state. Keep consumer group and connector."""
self._resetter.app.config.delete_consumer_group = False
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete connector and consumer group."""
await super().clean(dry_run)
self._resetter.app.config.delete_consumer_group = True
await self._resetter.clean(dry_run)
2 changes: 2 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)

async def clean(self, dry_run: bool) -> None:
"""Destroy component including related states.
:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
6 changes: 6 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def add_extra_output_topic(self, topic: KafkaTopic, role: str) -> None:
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.PRODUCER_APP.value}"

async def reset(self, dry_run: bool) -> None:
"""Reset not necessary, since producer app has no consumer group offsets."""
await super().reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
14 changes: 11 additions & 3 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ class StreamsAppCleaner(KafkaAppCleaner):
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}"

@override
async def reset(self, dry_run: bool) -> None:
self.app.streams.delete_output = False
await super().clean(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
self.app.streams.delete_output = True
await super().clean(dry_run)
if self.app.stateful_set and self.app.persistence.enabled:
await self.clean_pvcs(dry_run)
Expand Down Expand Up @@ -120,10 +126,12 @@ def helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
self._cleaner.app.streams.delete_output = False
await self._cleaner.clean(dry_run)
"""Destroy and reset."""
await super().reset(dry_run)
await self._cleaner.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
self._cleaner.app.streams.delete_output = True
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
36 changes: 36 additions & 0 deletions tests/components/test_helm_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,42 @@ async def test_should_call_helm_uninstall_when_destroying_helm_app(

log_info_mock.assert_called_once_with(magentaify(stdout))

@pytest.mark.asyncio()
async def test_should_call_helm_uninstall_when_resetting_helm_app(
self,
helm_app: HelmApp,
helm_mock: MagicMock,
log_info_mock: MagicMock,
):
stdout = 'HelmApp - release "test-helm-app" uninstalled'
helm_mock.uninstall.return_value = stdout

await helm_app.reset(True)

helm_mock.uninstall.assert_called_once_with(
"test-namespace", "${pipeline.name}-test-helm-app", True
)

log_info_mock.assert_called_once_with(magentaify(stdout))

@pytest.mark.asyncio()
async def test_should_call_helm_uninstall_when_cleaning_helm_app(
self,
helm_app: HelmApp,
helm_mock: MagicMock,
log_info_mock: MagicMock,
):
stdout = 'HelmApp - release "test-helm-app" uninstalled'
helm_mock.uninstall.return_value = stdout

await helm_app.clean(True)

helm_mock.uninstall.assert_called_once_with(
"test-namespace", "${pipeline.name}-test-helm-app", True
)

log_info_mock.assert_called_once_with(magentaify(stdout))

def test_helm_name_override(
self,
config: KpopsConfig,
Expand Down
11 changes: 11 additions & 0 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async def test_reset_when_dry_run_is_false(
helm_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")
mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -254,12 +255,14 @@ async def test_reset_when_dry_run_is_false(
mock_resetter_reset = mocker.spy(connector._resetter, "reset")

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

dry_run = False

await connector.reset(dry_run=dry_run)
mock_destroy.assert_not_called()
mock_resetter_reset.assert_called_once_with(dry_run)

mock.assert_has_calls(
Expand Down Expand Up @@ -329,6 +332,8 @@ async def test_clean_when_dry_run_is_false(
dry_run_handler_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -337,6 +342,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -360,6 +366,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -445,13 +452,16 @@ async def test_clean_without_to_when_dry_run_is_false(
resetter_namespace=RESETTER_NAMESPACE,
)

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
mock_clean_connector = mocker.patch.object(
connector.handlers.connector_handler, "clean_connector"
)
mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -460,6 +470,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
22 changes: 18 additions & 4 deletions tests/components/test_kafka_source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false(
mocker: MockerFixture,
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -174,10 +177,13 @@ async def test_reset_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

await connector.reset(dry_run=False)
dry_run = False
await connector.reset(dry_run)
mock_destroy.assert_not_called()

assert mock.mock_calls == [
mocker.call.helm.add_repo(
Expand All @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm.upgrade_install(
CONNECTOR_CLEAN_RELEASE_NAME,
"bakdata-kafka-connect-resetter/kafka-connect-resetter",
False,
dry_run,
RESETTER_NAMESPACE,
{
"connectorType": CONNECTOR_TYPE,
Expand All @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
Expand Down Expand Up @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false(
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false(

assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
Loading

0 comments on commit bcef0e5

Please sign in to comment.