Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call destroy from inside of reset or clean #501

Merged
merged 16 commits into from
Jul 15, 2024
2 changes: 0 additions & 2 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ def reset(
)

async def reset_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Reset", component)
await component.reset(dry_run)

Expand Down Expand Up @@ -217,7 +216,6 @@ def clean(
)

async def clean_runner(component: PipelineComponent):
await component.destroy(dry_run)
log_action("Clean", component)
await component.clean(dry_run)

Expand Down
8 changes: 8 additions & 0 deletions kpops/components/base_components/kafka_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def _resetter(self) -> KafkaConnectorResetter:

@override
async def deploy(self, dry_run: bool) -> None:
"""Deploy Kafka Connector (Source/Sink). Create output topics and register schemas if configured."""
if self.to:
for topic in self.to.kafka_topics:
await self.handlers.topic_handler.create_topic(topic, dry_run=dry_run)
Expand All @@ -181,12 +182,15 @@ async def deploy(self, dry_run: bool) -> None:

@override
async def destroy(self, dry_run: bool) -> None:
"""Delete Kafka Connector (Source/Sink) from the Kafka connect cluster."""
await self.handlers.connector_handler.destroy_connector(
self.full_name, dry_run=dry_run
)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete Kafka Connector. If schema handler is enabled, then remove schemas. Delete all the output topics."""
await super().clean(dry_run)
if self.to:
if self.handlers.schema_handler:
await self.handlers.schema_handler.delete_schemas(
Expand Down Expand Up @@ -229,10 +233,12 @@ def apply_from_inputs(self, name: str, topic: FromTopic) -> NoReturn:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset the state of a Kafka Connect source connector. The source connector is kept."""
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete the source connector. Reset the state of a Kafka Connect source connector."""
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
await super().clean(dry_run)
await self._resetter.clean(dry_run)

Expand Down Expand Up @@ -266,11 +272,13 @@ def set_error_topic(self, topic: KafkaTopic) -> None:

@override
async def reset(self, dry_run: bool) -> None:
"""Reset the consumer group offsets by deploying the sink resetter. The sink connector is kept."""
self._resetter.app.config.delete_consumer_group = False
await self._resetter.reset(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Delete sink connector. Delete the consumer group offsets by deploying the sink resetter."""
await super().clean(dry_run)
self._resetter.app.config.delete_consumer_group = True
await self._resetter.clean(dry_run)
2 changes: 2 additions & 0 deletions kpops/components/base_components/pipeline_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ async def reset(self, dry_run: bool) -> None:
:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved

async def clean(self, dry_run: bool) -> None:
"""Destroy component including related states.
:param dry_run: Whether to do a dry run of the command
"""
await self.destroy(dry_run)
2 changes: 2 additions & 0 deletions kpops/components/streams_bootstrap/producer/producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,6 @@ def helm_chart(self) -> str:

@override
async def clean(self, dry_run: bool) -> None:
"""Remove the producer app from the cluster. Deploy the cleanup job."""
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
await super().clean(dry_run)
await self._cleaner.clean(dry_run)
4 changes: 4 additions & 0 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,14 @@ def helm_chart(self) -> str:

@override
async def reset(self, dry_run: bool) -> None:
"""Remove the streams app from the cluster. Deploy the cleanup job with delete output set to false."""
await super().reset(dry_run)
self._cleaner.app.streams.delete_output = False
await self._cleaner.clean(dry_run)

@override
async def clean(self, dry_run: bool) -> None:
"""Remove the streams app from the cluster. Deploy the cleanup job with delete output set to true."""
await super().clean(dry_run)
self._cleaner.app.streams.delete_output = True
await self._cleaner.clean(dry_run)
11 changes: 11 additions & 0 deletions tests/components/test_kafka_sink_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ async def test_reset_when_dry_run_is_false(
helm_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")
mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -260,12 +261,14 @@ async def test_reset_when_dry_run_is_false(
mock_resetter_reset = mocker.spy(connector._resetter, "reset")

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

dry_run = False

await connector.reset(dry_run=dry_run)
mock_destroy.assert_not_called()
mock_resetter_reset.assert_called_once_with(dry_run)

mock.assert_has_calls(
Expand Down Expand Up @@ -335,6 +338,8 @@ async def test_clean_when_dry_run_is_false(
dry_run_handler_mock: MagicMock,
mocker: MockerFixture,
):
mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -343,6 +348,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -366,6 +372,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -451,13 +458,16 @@ async def test_clean_without_to_when_dry_run_is_false(
resetter_namespace=RESETTER_NAMESPACE,
)

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
mock_clean_connector = mocker.patch.object(
connector.handlers.connector_handler, "clean_connector"
)
mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -466,6 +476,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
22 changes: 18 additions & 4 deletions tests/components/test_kafka_source_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ async def test_reset_when_dry_run_is_false(
mocker: MockerFixture,
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -174,10 +177,13 @@ async def test_reset_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")

await connector.reset(dry_run=False)
dry_run = False
await connector.reset(dry_run)
mock_destroy.assert_not_called()

assert mock.mock_calls == [
mocker.call.helm.add_repo(
Expand All @@ -188,14 +194,14 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm.upgrade_install(
CONNECTOR_CLEAN_RELEASE_NAME,
"bakdata-kafka-connect-resetter/kafka-connect-resetter",
False,
dry_run,
RESETTER_NAMESPACE,
{
"connectorType": CONNECTOR_TYPE,
Expand All @@ -215,7 +221,7 @@ async def test_reset_when_dry_run_is_false(
mocker.call.helm.uninstall(
RESETTER_NAMESPACE,
CONNECTOR_CLEAN_RELEASE_NAME,
False,
dry_run,
),
ANY, # __bool__
ANY, # __str__
Expand Down Expand Up @@ -245,6 +251,8 @@ async def test_clean_when_dry_run_is_false(
):
assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -253,6 +261,7 @@ async def test_clean_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -262,6 +271,7 @@ async def test_clean_when_dry_run_is_false(

assert connector.to
assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
*(
mocker.call.mock_delete_topic(topic, dry_run=dry_run)
for topic in connector.to.kafka_topics
Expand Down Expand Up @@ -331,6 +341,8 @@ async def test_clean_without_to_when_dry_run_is_false(

assert connector.handlers.connector_handler

mock_destroy = mocker.patch.object(connector, "destroy")

mock_delete_topic = mocker.patch.object(
connector.handlers.topic_handler, "delete_topic"
)
Expand All @@ -339,6 +351,7 @@ async def test_clean_without_to_when_dry_run_is_false(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_destroy, "destroy_connector")
mock.attach_mock(mock_delete_topic, "mock_delete_topic")
mock.attach_mock(mock_clean_connector, "mock_clean_connector")
mock.attach_mock(helm_mock, "helm")
Expand All @@ -347,6 +360,7 @@ async def test_clean_without_to_when_dry_run_is_false(
await connector.clean(dry_run)

assert mock.mock_calls == [
mocker.call.destroy_connector(dry_run),
mocker.call.helm.add_repo(
"bakdata-kafka-connect-resetter",
"https://bakdata.github.io/kafka-connect-resetter/",
Expand Down
32 changes: 30 additions & 2 deletions tests/components/test_producer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,17 @@ async def test_destroy(
)

@pytest.mark.asyncio()
async def test_should_not_reset_producer_app(
async def test_should_clean_producer_app(
self,
producer_app: ProducerApp,
mocker: MockerFixture,
):
# actual component
mock_helm_uninstall_producer_app = mocker.patch.object(
producer_app.helm, "uninstall"
)

# cleaner
mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)
Expand All @@ -214,14 +220,22 @@ async def test_should_not_reset_producer_app(
)

mock = mocker.MagicMock()
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(
mock_helm_uninstall_producer_app, "helm_uninstall_producer_app"
)
mock.attach_mock(mock_helm_uninstall, "helm_uninstall")
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(mock_helm_print_helm_diff, "print_helm_diff")

await producer_app.clean(dry_run=True)

mock.assert_has_calls(
[
mocker.call.helm_uninstall_producer_app(
"test-namespace", PRODUCER_APP_RELEASE_NAME, True
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm_uninstall(
"test-namespace",
PRODUCER_APP_CLEAN_RELEASE_NAME,
Expand Down Expand Up @@ -264,6 +278,12 @@ async def test_should_not_reset_producer_app(
async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clean_up_with_dry_run_false(
self, mocker: MockerFixture, producer_app: ProducerApp
):
# actual component
mock_helm_uninstall_producer_app = mocker.patch.object(
producer_app.helm, "uninstall"
)

# cleaner
mock_helm_upgrade_install = mocker.patch.object(
producer_app._cleaner.helm, "upgrade_install"
)
Expand All @@ -272,13 +292,21 @@ async def test_should_clean_producer_app_and_deploy_clean_up_job_and_delete_clea
)

mock = mocker.MagicMock()
mock.attach_mock(
mock_helm_uninstall_producer_app, "helm_uninstall_producer_app"
)
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
mock.attach_mock(mock_helm_uninstall, "helm_uninstall")

await producer_app.clean(dry_run=False)

mock.assert_has_calls(
[
mocker.call.helm_uninstall_producer_app(
"test-namespace", PRODUCER_APP_RELEASE_NAME, False
),
ANY, # __bool__
ANY, # __str__
mocker.call.helm_uninstall(
"test-namespace",
PRODUCER_APP_CLEAN_RELEASE_NAME,
Expand Down
Loading
Loading