Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call destroy from inside of reset or clean #501

Merged
merged 16 commits into from
Jul 15, 2024
2 changes: 0 additions & 2 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -217,7 +216,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down
3 changes: 3 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ async def destroy(self, dry_run: bool) -> None:

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
if self.to:
if self.handlers.schema_handler:
await self.handlers.schema_handler.delete_schemas(
Expand Down Expand Up @@ -229,6 +230,7 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:

@override
async def reset(self, dry_run: bool) -> None:
await super().reset(dry_run)
await self._resetter.reset(dry_run)

@override
Expand Down Expand Up @@ -266,6 +268,7 @@ def set_error_topic(self, topic: KafkaTopic) -> None:

@override
async def reset(self, dry_run: bool) -> None:
await super().reset(dry_run)
self._resetter.app.config.delete_consumer_group = False
await self._resetter.reset(dry_run)

Expand Down
2 changes: 2 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None:

:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

async def clean(self, dry_run: bool) -> None:
"""Destroy component including related states.

:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,5 @@ def helm_chart(self) -> str:

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
2 changes: 2 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ def helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
await super().reset(dry_run)
self._cleaner.app.streams.delete_output = False
await self._cleaner.clean(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
self._cleaner.app.streams.delete_output = True
await self._cleaner.clean(dry_run)
11 changes: 11 additions & 0 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ async def test_reset_when_dry_run_is_false(
helm_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")
mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -260,6 +261,7 @@ async def test_reset_when_dry_run_is_false(
mock_resetter_reset = mocker.spy(connector._resetter, "reset")

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

Expand All @@ -270,6 +272,7 @@ async def test_reset_when_dry_run_is_false(

mock.assert_has_calls(
[
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down Expand Up @@ -335,6 +338,8 @@ async def test_clean_when_dry_run_is_false(
dry_run_handler_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -343,6 +348,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -366,6 +372,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -451,13 +458,16 @@ async def test_clean_without_to_when_dry_run_is_false(
resetter_namespace=RESETTER_NAMESPACE,
)

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
mock_clean_connector = mocker.patch.object(
connector.handlers.connector_handler, "clean_connector"
)
mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -466,6 +476,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
22 changes: 18 additions & 4 deletions tests/components/test_kafka_source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false(
mocker: MockerFixture,
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -174,12 +177,15 @@ async def test_reset_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

await connector.reset(dry_run=False)
dry_run = False
await connector.reset(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand All @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm.upgrade_install(
CONNECTOR_CLEAN_RELEASE_NAME,
"bakdata-kafka-connect-resetter/kafka-connect-resetter",
False,
dry_run,
RESETTER_NAMESPACE,
{
"connectorType": CONNECTOR_TYPE,
Expand All @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
Expand Down Expand Up @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false(
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false(

assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
32 changes: 30 additions & 2 deletions tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,17 @@ async def test_destroy(
)

@pytest.mark.asyncio()
async def test_should_not_reset_producer_app(
async def test_should_clean_producer_app(
self,
producer_app: ProducerApp,
mocker: MockerFixture,
):
# actual component
mock_helm_uninstall_producer_app = mocker.patch.object(
producer_app.helm, "uninstall"
)

# cleaner
mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)
Expand All @@ -214,14 +220,22 @@ async def test_should_not_reset_producer_app(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(
mock_helm_uninstall_producer_app, "helm_uninstall_producer_app"
)
mock.attach_mock(mock_helm_uninstall, "helm_uninstall")
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(mock_helm_print_helm_diff, "print_helm_diff")

await producer_app.clean(dry_run=True)

mock.assert_has_calls(
[
mocker.call.helm_uninstall_producer_app(
"test-namespace", PRODUCER_APP_RELEASE_NAME, True
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm_uninstall(
"test-namespace",
PRODUCER_APP_CLEAN_RELEASE_NAME,
Expand Down Expand Up @@ -264,6 +278,12 @@ async def test_should_not_reset_producer_app(
async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false(
self, mocker: MockerFixture, producer_app: ProducerApp
):
# actual component
mock_helm_uninstall_producer_app = mocker.patch.object(
producer_app.helm, "uninstall"
)

# cleaner
mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)
Expand All @@ -272,13 +292,21 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea
)

mock = mocker.MagicMock()
mock.attach_mock(
mock_helm_uninstall_producer_app, "helm_uninstall_producer_app"
)
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(mock_helm_uninstall, "helm_uninstall")

await producer_app.clean(dry_run=False)

mock.assert_has_calls(
[
mocker.call.helm_uninstall_producer_app(
"test-namespace", PRODUCER_APP_RELEASE_NAME, False
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm_uninstall(
"test-namespace",
PRODUCER_APP_CLEAN_RELEASE_NAME,
Expand Down
Loading
Loading