Skip to content

Commit

Permalink
Fix missing await on Kubernetes API (#488)
Browse files Browse the repository at this point in the history
The method needs to be awaited. See example:
https://github.com/tomplus/kubernetes_asyncio#example
  • Loading branch information
raminqaf authored May 15, 2024
1 parent e761b40 commit 134c2ae
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 29 deletions.
2 changes: 1 addition & 1 deletion examples
Submodule examples updated 1 files
+2 −0 config.yaml
8 changes: 4 additions & 4 deletions kpops/component_handlers/kubernetes/pvc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ async def create(cls, app_name: str, namespace: str) -> PVCHandler:
async def list_pvcs(self) -> list[str]:
async with ApiClient() as api:
core_v1_api = client.CoreV1Api(api)
pvc_list = core_v1_api.list_namespaced_persistent_volume_claim(
self.namespace, label_selector=f"app={self.app_name}"
pvc_list = await core_v1_api.list_namespaced_persistent_volume_claim(
namespace=self.namespace, label_selector=f"app={self.app_name}"
)

pvc_names = [pvc.metadata.name for pvc in pvc_list.items]
Expand All @@ -44,6 +44,6 @@ async def delete_pvcs(self) -> None:
f"Deleting in namespace '{self.namespace}' StatefulSet '{self.app_name}' PVCs '{pvc_names}'"
)
for pvc_name in pvc_names:
core_v1_api.delete_namespaced_persistent_volume_claim(
await core_v1_api.delete_namespaced_persistent_volume_claim(
pvc_name, self.namespace
)
) # type: ignore [reportGeneralTypeIssues]
17 changes: 7 additions & 10 deletions kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing_extensions import override

from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler
from kpops.components import HelmApp
from kpops.components.base_components.kafka_app import (
KafkaApp,
KafkaAppCleaner,
Expand All @@ -30,25 +31,21 @@ class StreamsAppCleaner(KafkaAppCleaner):
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}"

@cached_property
def pvc_handler(self) -> PVCHandler:
return PVCHandler(self.full_name, self.namespace)

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
if self.app.stateful_set and self.app.persistence.enabled:
await self.clean_pvcs(dry_run)

async def clean_pvcs(self, dry_run: bool) -> None:
app_full_name = super(HelmApp, self).full_name
pvc_handler = await PVCHandler.create(app_full_name, self.namespace)
if dry_run:
pvc_names = await self.pvc_handler.list_pvcs()
log.info(
f"Deleting the PVCs {pvc_names} for StatefulSet '{self.full_name}'"
)
pvc_names = await pvc_handler.list_pvcs()
log.info(f"Deleting the PVCs {pvc_names} for StatefulSet '{app_full_name}'")
else:
log.info(f"Deleting the PVCs for StatefulSet '{self.full_name}'")
await self.pvc_handler.delete_pvcs()
log.info(f"Deleting the PVCs for StatefulSet '{app_full_name}'")
await pvc_handler.delete_pvcs()


class StreamsApp(KafkaApp, StreamsBootstrap):
Expand Down
30 changes: 25 additions & 5 deletions tests/components/test_streams_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,15 @@ async def test_stateful_clean_with_dry_run_false(

mock_helm_upgrade_install = mocker.patch.object(cleaner.helm, "upgrade_install")
mock_helm_uninstall = mocker.patch.object(cleaner.helm, "uninstall")
mock_delete_pvcs = mocker.patch.object(cleaner.pvc_handler, "delete_pvcs")

module = StreamsAppCleaner.__module__
mock_pvc_handler_instance = AsyncMock()
mock_delete_pvcs = mock_pvc_handler_instance.delete_pvcs
mock_delete_pvcs.return_value = AsyncMock()

mocker.patch(
f"{module}.PVCHandler.create", return_value=mock_pvc_handler_instance
)

mock = MagicMock()
mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install")
Expand Down Expand Up @@ -787,15 +795,27 @@ async def test_stateful_clean_with_dry_run_true(

pvc_names = ["test-pvc1", "test-pvc2", "test-pvc3"]

mock_pvc_handler_instance = AsyncMock()
mock_list_pvcs = mock_pvc_handler_instance.list_pvcs
mock_list_pvcs.return_value = pvc_names

module = StreamsAppCleaner.__module__
pvc_handler_create = mocker.patch(
f"{module}.PVCHandler.create", return_value=mock_pvc_handler_instance
)
mocker.patch.object(cleaner, "destroy")
mocker.patch.object(cleaner, "deploy")
mock_get_pvc_names = mocker.patch.object(cleaner.pvc_handler, "list_pvcs")
mock_get_pvc_names.return_value = pvc_names
mocker.patch.object(mock_list_pvcs, "list_pvcs")

dry_run = True
await stateful_streams_app.clean(dry_run=dry_run)
mock_get_pvc_names.assert_called_once()

pvc_handler_create.assert_called_once_with(
STREAMS_APP_FULL_NAME, "test-namespace"
)

mock_list_pvcs.assert_called_once()
assert (
f"Deleting the PVCs {pvc_names} for StatefulSet '{STREAMS_APP_CLEAN_FULL_NAME}'"
f"Deleting the PVCs {pvc_names} for StatefulSet '{STREAMS_APP_FULL_NAME}'"
in caplog.text
)
41 changes: 32 additions & 9 deletions tests/kubernetes/test_pvc_handler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from unittest.mock import MagicMock
from unittest.mock import AsyncMock

import pytest
from kubernetes_asyncio.client import (
V1ObjectMeta,
V1PersistentVolumeClaim,
V1PersistentVolumeClaimList,
V1PersistentVolumeClaimSpec,
V1PersistentVolumeClaimStatus,
)
from pytest_mock import MockerFixture

from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler
Expand All @@ -25,24 +32,40 @@ async def test_create(pvc_handler: PVCHandler, mocker: MockerFixture):

@pytest.mark.asyncio
async def test_pvc_names(pvc_handler: PVCHandler, mocker: MockerFixture):
mock_pvc = MagicMock()
mock_pvc.metadata.name = "test-pvc"
test_pvc1 = V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=V1ObjectMeta(name="datadir-test-app-1"),
spec=V1PersistentVolumeClaimSpec(),
status=V1PersistentVolumeClaimStatus(),
)
test_pvc2 = V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=V1ObjectMeta(name="datadir-test-app-2"),
spec=V1PersistentVolumeClaimSpec(),
status=V1PersistentVolumeClaimStatus(),
)
volume_claim_list = V1PersistentVolumeClaimList(items=[test_pvc1, test_pvc2])

async_mock = AsyncMock()
async_mock.list_namespaced_persistent_volume_claim.return_value = volume_claim_list

mocker.patch(f"{MODULE}.client.CoreV1Api", return_value=async_mock)

mock_core_v1_api = mocker.patch(f"{MODULE}.client.CoreV1Api")
mock_core_v1_api.return_value.list_namespaced_persistent_volume_claim.return_value.items = [
mock_pvc
]
pvcs = await pvc_handler.list_pvcs()

assert pvcs == ["test-pvc"]
assert pvcs == ["datadir-test-app-1", "datadir-test-app-2"]


@pytest.mark.asyncio
async def test_delete_pvcs(pvc_handler: PVCHandler, mocker: MockerFixture):
mocker.patch.object(
pvc_handler, "list_pvcs", return_value=["test-pvc-1", "test-pvc-2"]
)
mock_core_v1_api = mocker.patch(f"{MODULE}.client.CoreV1Api")
mock_core_v1_api = mocker.patch(
f"{MODULE}.client.CoreV1Api", return_value=AsyncMock()
)
await pvc_handler.delete_pvcs()
mock_core_v1_api.return_value.assert_has_calls(
[
Expand Down

0 comments on commit 134c2ae

Please sign in to comment.