Skip to content

Commit

Permalink
Call destroy from inside of reset or clean
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf committed Jun 27, 2024
1 parent 3d79e8e commit d083d9a
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 19 deletions.
2 changes: 0 additions & 2 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -217,7 +216,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down
3 changes: 3 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ async def destroy(self, dry_run: bool) -> None:

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
if self.to:
if self.handlers.schema_handler:
await self.handlers.schema_handler.delete_schemas(
Expand Down Expand Up @@ -229,6 +230,7 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:

@override
async def reset(self, dry_run: bool) -> None:
await super().reset(dry_run)
await self._resetter.reset(dry_run)

@override
Expand Down Expand Up @@ -266,6 +268,7 @@ def set_error_topic(self, topic: KafkaTopic) -> None:

@override
async def reset(self, dry_run: bool) -> None:
await super().reset(dry_run)
self._resetter.app.config.delete_consumer_group = False
await self._resetter.reset(dry_run)

Expand Down
2 changes: 2 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)

async def clean(self, dry_run: bool) -> None:
"""Destroy component including related states.
:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,5 @@ def helm_chart(self) -> str:

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
2 changes: 2 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ def helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
await super().reset(dry_run)
self._cleaner.app.streams.delete_output = False
await self._cleaner.clean(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
self._cleaner.app.streams.delete_output = True
await self._cleaner.clean(dry_run)
11 changes: 11 additions & 0 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ async def test_reset_when_dry_run_is_false(
helm_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")
mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -260,6 +261,7 @@ async def test_reset_when_dry_run_is_false(
mock_resetter_reset = mocker.spy(connector._resetter, "reset")

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

Expand All @@ -270,6 +272,7 @@ async def test_reset_when_dry_run_is_false(

mock.assert_has_calls(
[
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down Expand Up @@ -335,6 +338,8 @@ async def test_clean_when_dry_run_is_false(
dry_run_handler_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -343,6 +348,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -366,6 +372,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -451,13 +458,16 @@ async def test_clean_without_to_when_dry_run_is_false(
resetter_namespace=RESETTER_NAMESPACE,
)

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
mock_clean_connector = mocker.patch.object(
connector.handlers.connector_handler, "clean_connector"
)
mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -466,6 +476,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
22 changes: 18 additions & 4 deletions tests/components/test_kafka_source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false(
mocker: MockerFixture,
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -174,12 +177,15 @@ async def test_reset_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

await connector.reset(dry_run=False)
dry_run = False
await connector.reset(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand All @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm.upgrade_install(
CONNECTOR_CLEAN_RELEASE_NAME,
"bakdata-kafka-connect-resetter/kafka-connect-resetter",
False,
dry_run,
RESETTER_NAMESPACE,
{
"connectorType": CONNECTOR_TYPE,
Expand All @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
Expand Down Expand Up @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false(
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false(

assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
32 changes: 30 additions & 2 deletions tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,17 @@ async def test_destroy(
)

@pytest.mark.asyncio()
async def test_should_not_reset_producer_app(
async def test_should_clean_producer_app(
self,
producer_app: ProducerApp,
mocker: MockerFixture,
):
# actual component
mock_helm_uninstall_producer_app = mocker.patch.object(
producer_app.helm, "uninstall"
)

# cleaner
mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)
Expand All @@ -214,14 +220,22 @@ async def test_should_not_reset_producer_app(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(
mock_helm_uninstall_producer_app, "helm_uninstall_producer_app"
)
mock.attach_mock(mock_helm_uninstall, "helm_uninstall")
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(mock_helm_print_helm_diff, "print_helm_diff")

await producer_app.clean(dry_run=True)

mock.assert_has_calls(
[
mocker.call.helm_uninstall_producer_app(
"test-namespace", PRODUCER_APP_RELEASE_NAME, True
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm_uninstall(
"test-namespace",
PRODUCER_APP_CLEAN_RELEASE_NAME,
Expand Down Expand Up @@ -264,6 +278,12 @@ async def test_should_not_reset_producer_app(
async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false(
self, mocker: MockerFixture, producer_app: ProducerApp
):
# actual component
mock_helm_uninstall_producer_app = mocker.patch.object(
producer_app.helm, "uninstall"
)

# cleaner
mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)
Expand All @@ -272,13 +292,21 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea
)

mock = mocker.MagicMock()
mock.attach_mock(
mock_helm_uninstall_producer_app, "helm_uninstall_producer_app"
)
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(mock_helm_uninstall, "helm_uninstall")

await producer_app.clean(dry_run=False)

mock.assert_has_calls(
[
mocker.call.helm_uninstall_producer_app(
"test-namespace", PRODUCER_APP_RELEASE_NAME, False
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm_uninstall(
"test-namespace",
PRODUCER_APP_CLEAN_RELEASE_NAME,
Expand Down
Loading

0 comments on commit d083d9a

Please sign in to comment.