Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call destroy from inside of reset or clean #501

Merged
merged 16 commits into from
Jul 15, 2024
27 changes: 27 additions & 0 deletions docs/docs/user/migration-guide/v6-v7.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,30 @@ As a result of the restructure, some imports need to be adjusted:
-components/__init__.py
+kpops/components/custom/__init__.py
```

## [Call destroy from inside of reset or clean](https://github.com/bakdata/kpops/pull/501)

Before v7, the KPOps CLI executed `destroy` before running `reset/clean` to ensure the component was destroyed.

This logic has changed. From now on, the `destroy` method should be called within the `reset/clean` method if needed by the user.

During the migration to v7, you should check your custom components and see if they are overriding the `reset/clean` methods. If so, you would need to add the `destroy` method at the beginning of your `reset/clean` override.

#### components.py

In the following example, we must call the `destroy` method inside the `clean` method. ** Otherwise, the logic of destroy will not be executed!**
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
disrupted marked this conversation as resolved.
Show resolved Hide resolved

```diff
class MyCustomComponent(PipelineComponent):

@override
async def destroy(self, dry_run: bool) -> None:
# Some custom destroy logic
# ...

@override
async def clean(self, dry_run: bool) -> None:
+ destroy(dry_run)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
# Some custom clean logic
# ...
```
2 changes: 0 additions & 2 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -286,7 +285,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def _resetter(self) -> KafkaConnectorResetter:

@override
async def deploy(self, dry_run: bool) -> None:
"""Deploy Kafka Connector (Source/Sink). Create output topics and register schemas if configured."""
if self.to:
for topic in self.to.kafka_topics:
await self.handlers.topic_handler.create_topic(topic, dry_run=dry_run)
Expand All @@ -181,12 +182,15 @@ async def deploy(self, dry_run: bool) -> None:

@override
async def destroy(self, dry_run: bool) -> None:
"""Delete Kafka Connector (Source/Sink) from the Kafka connect cluster."""
await self.handlers.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete Kafka Connector. If schema handler is enabled, then remove schemas. Delete all the output topics."""
await self.destroy(dry_run)
disrupted marked this conversation as resolved.
Show resolved Hide resolved
if self.to:
if self.handlers.schema_handler:
await self.handlers.schema_handler.delete_schemas(
Expand Down Expand Up @@ -229,10 +233,12 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset state. Keep connector."""
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete connector and reset state."""
await super().clean(dry_run)
await self._resetter.clean(dry_run)

Expand Down Expand Up @@ -266,11 +272,13 @@ def set_error_topic(self, topic: KafkaTopic) -> None:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset state. Keep consumer group and connector."""
self._resetter.app.config.delete_consumer_group = False
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete connector and consumer group."""
await super().clean(dry_run)
self._resetter.app.config.delete_consumer_group = True
await self._resetter.clean(dry_run)
9 changes: 9 additions & 0 deletions kpops/components/common/streams_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pydantic
from pydantic import Field
from typing_extensions import override

from kpops.component_handlers.helm_wrapper.model import HelmRepoConfig
from kpops.components.base_components.helm_app import HelmApp, HelmAppValues
Expand Down Expand Up @@ -72,3 +73,11 @@ def warning_for_latest_image_tag(self) -> Self:
f"The image tag for component '{self.name}' is set or defaulted to 'latest'. Please, consider providing a stable image tag."
)
return self

@override
async def reset(self, dry_run: bool) -> None:
await self.destroy(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
await self.destroy(dry_run)
disrupted marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 8 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def add_extra_output_topic(self, topic: KafkaTopic, role: str) -> None:
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.PRODUCER_APP.value}"

async def reset(self, dry_run: bool) -> None:
"""Not supported. Producer App cannot be reset.
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

The producer app doesn't have consumer group offsets to be reset.
"""

@override
async def clean(self, dry_run: bool) -> None:
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
14 changes: 11 additions & 3 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ class StreamsAppCleaner(KafkaAppCleaner):
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}"

@override
async def reset(self, dry_run: bool) -> None:
self.app.streams.delete_output = False
await super().clean(dry_run)
disrupted marked this conversation as resolved.
Show resolved Hide resolved

@override
async def clean(self, dry_run: bool) -> None:
self.app.streams.delete_output = True
await super().clean(dry_run)
if self.app.stateful_set and self.app.persistence.enabled:
await self.clean_pvcs(dry_run)
Expand Down Expand Up @@ -120,10 +126,12 @@ def helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
self._cleaner.app.streams.delete_output = False
await self._cleaner.clean(dry_run)
"""Destroy and reset."""
await super().reset(dry_run)
await self._cleaner.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
self._cleaner.app.streams.delete_output = True
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
11 changes: 11 additions & 0 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async def test_reset_when_dry_run_is_false(
helm_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")
mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -254,12 +255,14 @@ async def test_reset_when_dry_run_is_false(
mock_resetter_reset = mocker.spy(connector._resetter, "reset")

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

dry_run = False

await connector.reset(dry_run=dry_run)
mock_destroy.assert_not_called()
mock_resetter_reset.assert_called_once_with(dry_run)

mock.assert_has_calls(
Expand Down Expand Up @@ -329,6 +332,8 @@ async def test_clean_when_dry_run_is_false(
dry_run_handler_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -337,6 +342,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -360,6 +366,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -445,13 +452,16 @@ async def test_clean_without_to_when_dry_run_is_false(
resetter_namespace=RESETTER_NAMESPACE,
)

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
mock_clean_connector = mocker.patch.object(
connector.handlers.connector_handler, "clean_connector"
)
mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -460,6 +470,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
22 changes: 18 additions & 4 deletions tests/components/test_kafka_source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false(
mocker: MockerFixture,
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -174,10 +177,13 @@ async def test_reset_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

await connector.reset(dry_run=False)
dry_run = False
await connector.reset(dry_run)
mock_destroy.assert_not_called()

assert mock.mock_calls == [
mocker.call.helm.add_repo(
Expand All @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm.upgrade_install(
CONNECTOR_CLEAN_RELEASE_NAME,
"bakdata-kafka-connect-resetter/kafka-connect-resetter",
False,
dry_run,
RESETTER_NAMESPACE,
{
"connectorType": CONNECTOR_TYPE,
Expand All @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
Expand Down Expand Up @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false(
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false(

assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
Loading
Loading