Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call destroy from inside of reset or clean #501

Merged
merged 16 commits into from
Jul 15, 2024
37 changes: 37 additions & 0 deletions docs/docs/user/migration-guide/v6-v7.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,40 @@ As a result of the restructure, some imports need to be adjusted:
-components/__init__.py
+kpops/components/custom/__init__.py
```

## [Call destroy from inside of reset or clean](https://github.com/bakdata/kpops/pull/501)

Before v7, the KPOps CLI executed `destroy` before running `reset/clean` to ensure the component was destroyed.

This logic has changed. The `destroy` method is now called within the `PipelineComponent`'s `reset`/`clean`.

During migrating to v7, you should check your custom components and see if they override the `reset`/`clean` methods. If so, you need to call the supermethod `reset`/`clean` to trigger the `destroy` inside the parent class. Alternatively, if you are implementing the `PipelineComponent` class, you need to call the `destroy` method at the beginning of the method.

#### components.py

For example, when creating a custom `StreamsApp` or `ProducerApp` (or any other custom component), you **must** call the supermethod `reset`/`clean` to execute the `destroy` in the parent class. **Otherwise, the logic of destroy will not be executed!**

````diff
class MyStreamsApp(StreamsApp):

@override
async def clean(self, dry_run: bool) -> None:
+ await super().clean(dry_run)
# Some custom clean logic
# ...
```diff


class MyCustomComponent(PipelineComponent):

@override
async def destroy(self, dry_run: bool) -> None:
# Some custom destroy logic
# ...

@override
async def clean(self, dry_run: bool) -> None:
+ await super().clean(dry_run)
# Some custom clean logic
# ...
````
2 changes: 0 additions & 2 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -286,7 +285,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def _resetter(self) -> KafkaConnectorResetter:

@override
async def deploy(self, dry_run: bool) -> None:
"""Deploy Kafka Connector (Source/Sink). Create output topics and register schemas if configured."""
if self.to:
for topic in self.to.kafka_topics:
await self.handlers.topic_handler.create_topic(topic, dry_run=dry_run)
Expand All @@ -181,12 +182,15 @@ async def deploy(self, dry_run: bool) -> None:

@override
async def destroy(self, dry_run: bool) -> None:
"""Delete Kafka Connector (Source/Sink) from the Kafka connect cluster."""
await self.handlers.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete Kafka Connector. If schema handler is enabled, then remove schemas. Delete all the output topics."""
await super().clean(dry_run)
if self.to:
if self.handlers.schema_handler:
await self.handlers.schema_handler.delete_schemas(
Expand Down Expand Up @@ -229,10 +233,12 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset state. Keep connector."""
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete connector and reset state."""
await super().clean(dry_run)
await self._resetter.clean(dry_run)

Expand Down Expand Up @@ -266,11 +272,13 @@ def set_error_topic(self, topic: KafkaTopic) -> None:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset state. Keep consumer group and connector."""
self._resetter.app.config.delete_consumer_group = False
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete connector and consumer group."""
await super().clean(dry_run)
self._resetter.app.config.delete_consumer_group = True
await self._resetter.clean(dry_run)
2 changes: 2 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None:

:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

async def clean(self, dry_run: bool) -> None:
"""Destroy component including related states.

:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
6 changes: 6 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def add_extra_output_topic(self, topic: KafkaTopic, role: str) -> None:
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.PRODUCER_APP.value}"

async def reset(self, dry_run: bool) -> None:
"""Reset not necessary, since producer app has no consumer group offsets."""
await super().reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
14 changes: 11 additions & 3 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@ class StreamsAppCleaner(KafkaAppCleaner):
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}"

@override
async def reset(self, dry_run: bool) -> None:
self.app.streams.delete_output = False
await super().clean(dry_run)
disrupted marked this conversation as resolved.
Show resolved Hide resolved

@override
async def clean(self, dry_run: bool) -> None:
self.app.streams.delete_output = True
await super().clean(dry_run)
if self.app.stateful_set and self.app.persistence.enabled:
await self.clean_pvcs(dry_run)
Expand Down Expand Up @@ -120,10 +126,12 @@ def helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
self._cleaner.app.streams.delete_output = False
await self._cleaner.clean(dry_run)
"""Destroy and reset."""
await super().reset(dry_run)
await self._cleaner.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
self._cleaner.app.streams.delete_output = True
"""Destroy and clean."""
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
36 changes: 36 additions & 0 deletions tests/components/test_helm_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,42 @@ async def test_should_call_helm_uninstall_when_destroying_helm_app(

log_info_mock.assert_called_once_with(magentaify(stdout))

@pytest.mark.asyncio()
async def test_should_call_helm_uninstall_when_resetting_helm_app(
self,
helm_app: HelmApp,
helm_mock: MagicMock,
log_info_mock: MagicMock,
):
stdout = 'HelmApp - release "test-helm-app" uninstalled'
helm_mock.uninstall.return_value = stdout

await helm_app.reset(True)

helm_mock.uninstall.assert_called_once_with(
"test-namespace", "${pipeline.name}-test-helm-app", True
)

log_info_mock.assert_called_once_with(magentaify(stdout))

@pytest.mark.asyncio()
async def test_should_call_helm_uninstall_when_cleaning_helm_app(
self,
helm_app: HelmApp,
helm_mock: MagicMock,
log_info_mock: MagicMock,
):
stdout = 'HelmApp - release "test-helm-app" uninstalled'
helm_mock.uninstall.return_value = stdout

await helm_app.clean(True)

helm_mock.uninstall.assert_called_once_with(
"test-namespace", "${pipeline.name}-test-helm-app", True
)

log_info_mock.assert_called_once_with(magentaify(stdout))

def test_helm_name_override(
self,
config: KpopsConfig,
Expand Down
11 changes: 11 additions & 0 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ async def test_reset_when_dry_run_is_false(
helm_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")
mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -254,12 +255,14 @@ async def test_reset_when_dry_run_is_false(
mock_resetter_reset = mocker.spy(connector._resetter, "reset")

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

dry_run = False

await connector.reset(dry_run=dry_run)
mock_destroy.assert_not_called()
mock_resetter_reset.assert_called_once_with(dry_run)

mock.assert_has_calls(
Expand Down Expand Up @@ -329,6 +332,8 @@ async def test_clean_when_dry_run_is_false(
dry_run_handler_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -337,6 +342,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -360,6 +366,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -445,13 +452,16 @@ async def test_clean_without_to_when_dry_run_is_false(
resetter_namespace=RESETTER_NAMESPACE,
)

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
mock_clean_connector = mocker.patch.object(
connector.handlers.connector_handler, "clean_connector"
)
mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -460,6 +470,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
22 changes: 18 additions & 4 deletions tests/components/test_kafka_source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false(
mocker: MockerFixture,
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -174,10 +177,13 @@ async def test_reset_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

await connector.reset(dry_run=False)
dry_run = False
await connector.reset(dry_run)
mock_destroy.assert_not_called()

assert mock.mock_calls == [
mocker.call.helm.add_repo(
Expand All @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm.upgrade_install(
CONNECTOR_CLEAN_RELEASE_NAME,
"bakdata-kafka-connect-resetter/kafka-connect-resetter",
False,
dry_run,
RESETTER_NAMESPACE,
{
"connectorType": CONNECTOR_TYPE,
Expand All @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
Expand Down Expand Up @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false(
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false(

assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
Loading
Loading