From 134c2ae9742e6a779604b323de6108dbcf6d3666 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Wed, 15 May 2024 11:40:43 +0200 Subject: [PATCH 1/2] Fix missing await on Kubernetes API (#488) The method needs to be awaited. See example: https://github.com/tomplus/kubernetes_asyncio#example --- examples | 2 +- .../kubernetes/pvc_handler.py | 8 ++-- .../streams_bootstrap/streams/streams_app.py | 17 ++++---- tests/components/test_streams_app.py | 30 +++++++++++--- tests/kubernetes/test_pvc_handler.py | 41 +++++++++++++++---- 5 files changed, 69 insertions(+), 29 deletions(-) diff --git a/examples b/examples index 95fe43b15..f7613426d 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 95fe43b1504836d5d36688f75dfbea6598d8281a +Subproject commit f7613426dffe5a1d6332c9e3cc7f0bfb23396e68 diff --git a/kpops/component_handlers/kubernetes/pvc_handler.py b/kpops/component_handlers/kubernetes/pvc_handler.py index 0c17e9a4c..36f02e726 100644 --- a/kpops/component_handlers/kubernetes/pvc_handler.py +++ b/kpops/component_handlers/kubernetes/pvc_handler.py @@ -22,8 +22,8 @@ async def create(cls, app_name: str, namespace: str) -> PVCHandler: async def list_pvcs(self) -> list[str]: async with ApiClient() as api: core_v1_api = client.CoreV1Api(api) - pvc_list = core_v1_api.list_namespaced_persistent_volume_claim( - self.namespace, label_selector=f"app={self.app_name}" + pvc_list = await core_v1_api.list_namespaced_persistent_volume_claim( + namespace=self.namespace, label_selector=f"app={self.app_name}" ) pvc_names = [pvc.metadata.name for pvc in pvc_list.items] @@ -44,6 +44,6 @@ async def delete_pvcs(self) -> None: f"Deleting in namespace '{self.namespace}' StatefulSet '{self.app_name}' PVCs '{pvc_names}'" ) for pvc_name in pvc_names: - core_v1_api.delete_namespaced_persistent_volume_claim( + await core_v1_api.delete_namespaced_persistent_volume_claim( pvc_name, self.namespace - ) + ) # type: ignore [reportGeneralTypeIssues] diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 2f15be10b..9bd5d87c5 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -5,6 +5,7 @@ from typing_extensions import override from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler +from kpops.components import HelmApp from kpops.components.base_components.kafka_app import ( KafkaApp, KafkaAppCleaner, @@ -30,10 +31,6 @@ class StreamsAppCleaner(KafkaAppCleaner): def helm_chart(self) -> str: return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}" - @cached_property - def pvc_handler(self) -> PVCHandler: - return PVCHandler(self.full_name, self.namespace) - @override async def clean(self, dry_run: bool) -> None: await super().clean(dry_run) @@ -41,14 +38,14 @@ async def clean(self, dry_run: bool) -> None: await self.clean_pvcs(dry_run) async def clean_pvcs(self, dry_run: bool) -> None: + app_full_name = super(HelmApp, self).full_name + pvc_handler = await PVCHandler.create(app_full_name, self.namespace) if dry_run: - pvc_names = await self.pvc_handler.list_pvcs() - log.info( - f"Deleting the PVCs {pvc_names} for StatefulSet '{self.full_name}'" - ) + pvc_names = await pvc_handler.list_pvcs() + log.info(f"Deleting the PVCs {pvc_names} for StatefulSet '{app_full_name}'") else: - log.info(f"Deleting the PVCs for StatefulSet '{self.full_name}'") - await self.pvc_handler.delete_pvcs() + log.info(f"Deleting the PVCs for StatefulSet '{app_full_name}'") + await pvc_handler.delete_pvcs() class StreamsApp(KafkaApp, StreamsBootstrap): diff --git a/tests/components/test_streams_app.py b/tests/components/test_streams_app.py index e272f7f2e..7af0634d7 100644 --- a/tests/components/test_streams_app.py +++ b/tests/components/test_streams_app.py @@ -725,7 +725,15 @@ async def test_stateful_clean_with_dry_run_false( mock_helm_upgrade_install = mocker.patch.object(cleaner.helm, "upgrade_install") mock_helm_uninstall = mocker.patch.object(cleaner.helm, "uninstall") - mock_delete_pvcs = mocker.patch.object(cleaner.pvc_handler, "delete_pvcs") + + module = StreamsAppCleaner.__module__ + mock_pvc_handler_instance = AsyncMock() + mock_delete_pvcs = mock_pvc_handler_instance.delete_pvcs + mock_delete_pvcs.return_value = AsyncMock() + + mocker.patch( + f"{module}.PVCHandler.create", return_value=mock_pvc_handler_instance + ) mock = MagicMock() mock.attach_mock(mock_helm_upgrade_install, "helm_upgrade_install") @@ -787,15 +795,27 @@ async def test_stateful_clean_with_dry_run_true( pvc_names = ["test-pvc1", "test-pvc2", "test-pvc3"] + mock_pvc_handler_instance = AsyncMock() + mock_list_pvcs = mock_pvc_handler_instance.list_pvcs + mock_list_pvcs.return_value = pvc_names + + module = StreamsAppCleaner.__module__ + pvc_handler_create = mocker.patch( + f"{module}.PVCHandler.create", return_value=mock_pvc_handler_instance + ) mocker.patch.object(cleaner, "destroy") mocker.patch.object(cleaner, "deploy") - mock_get_pvc_names = mocker.patch.object(cleaner.pvc_handler, "list_pvcs") - mock_get_pvc_names.return_value = pvc_names + mocker.patch.object(mock_list_pvcs, "list_pvcs") dry_run = True await stateful_streams_app.clean(dry_run=dry_run) - mock_get_pvc_names.assert_called_once() + + pvc_handler_create.assert_called_once_with( + STREAMS_APP_FULL_NAME, "test-namespace" + ) + + mock_list_pvcs.assert_called_once() assert ( - f"Deleting the PVCs {pvc_names} for StatefulSet '{STREAMS_APP_CLEAN_FULL_NAME}'" + f"Deleting the PVCs {pvc_names} for StatefulSet '{STREAMS_APP_FULL_NAME}'" in caplog.text ) diff --git a/tests/kubernetes/test_pvc_handler.py b/tests/kubernetes/test_pvc_handler.py index c3c1b4ec8..24f1a33be 100644 --- a/tests/kubernetes/test_pvc_handler.py +++ b/tests/kubernetes/test_pvc_handler.py @@ -1,6 +1,13 @@ -from unittest.mock import MagicMock +from unittest.mock import AsyncMock import pytest +from kubernetes_asyncio.client import ( + V1ObjectMeta, + V1PersistentVolumeClaim, + V1PersistentVolumeClaimList, + V1PersistentVolumeClaimSpec, + V1PersistentVolumeClaimStatus, +) from pytest_mock import MockerFixture from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler @@ -25,16 +32,30 @@ async def test_create(pvc_handler: PVCHandler, mocker: MockerFixture): @pytest.mark.asyncio async def test_pvc_names(pvc_handler: PVCHandler, mocker: MockerFixture): - mock_pvc = MagicMock() - mock_pvc.metadata.name = "test-pvc" + test_pvc1 = V1PersistentVolumeClaim( + api_version="v1", + kind="PersistentVolumeClaim", + metadata=V1ObjectMeta(name="datadir-test-app-1"), + spec=V1PersistentVolumeClaimSpec(), + status=V1PersistentVolumeClaimStatus(), + ) + test_pvc2 = V1PersistentVolumeClaim( + api_version="v1", + kind="PersistentVolumeClaim", + metadata=V1ObjectMeta(name="datadir-test-app-2"), + spec=V1PersistentVolumeClaimSpec(), + status=V1PersistentVolumeClaimStatus(), + ) + volume_claim_list = V1PersistentVolumeClaimList(items=[test_pvc1, test_pvc2]) + + async_mock = AsyncMock() + async_mock.list_namespaced_persistent_volume_claim.return_value = volume_claim_list + + mocker.patch(f"{MODULE}.client.CoreV1Api", return_value=async_mock) - mock_core_v1_api = mocker.patch(f"{MODULE}.client.CoreV1Api") - mock_core_v1_api.return_value.list_namespaced_persistent_volume_claim.return_value.items = [ - mock_pvc - ] pvcs = await pvc_handler.list_pvcs() - assert pvcs == ["test-pvc"] + assert pvcs == ["datadir-test-app-1", "datadir-test-app-2"] @pytest.mark.asyncio @@ -42,7 +63,9 @@ async def test_delete_pvcs(pvc_handler: PVCHandler, mocker: MockerFixture): mocker.patch.object( pvc_handler, "list_pvcs", return_value=["test-pvc-1", "test-pvc-2"] ) - mock_core_v1_api = mocker.patch(f"{MODULE}.client.CoreV1Api") + mock_core_v1_api = mocker.patch( + f"{MODULE}.client.CoreV1Api", return_value=AsyncMock() + ) await pvc_handler.delete_pvcs() mock_core_v1_api.return_value.assert_has_calls( [ From 8fcf266e87e18acbc10729cb0f0276559ec081c9 Mon Sep 17 00:00:00 2001 From: bakdata-bots Date: Wed, 15 May 2024 09:41:34 +0000 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version=205.0.0=20=E2=86=92=205.0.1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/docs/user/changelog.md | 11 +++++++++++ kpops/__init__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/docs/docs/user/changelog.md b/docs/docs/user/changelog.md index 1648da8a2..26f84d003 100644 --- a/docs/docs/user/changelog.md +++ b/docs/docs/user/changelog.md @@ -1,4 +1,15 @@ # Changelog +## [5.0.1](https://github.com/bakdata/kpops/releases/tag/5.0.1) - Release Date: [2024-05-15] + +### 🐛 Fixes + +- Fix missing await on Kubernetes API - [#488](https://github.com/bakdata/kpops/pull/488) + + + + + + ## [5.0.0](https://github.com/bakdata/kpops/releases/tag/5.0.0) - Release Date: [2024-05-02] ### 🏗️ Breaking changes diff --git a/kpops/__init__.py b/kpops/__init__.py index 3d3fa15df..34ff74a89 100644 --- a/kpops/__init__.py +++ b/kpops/__init__.py @@ -1,4 +1,4 @@ -__version__ = "5.0.0" +__version__ = "5.0.1" # export public API functions from kpops.cli.main import clean, deploy, destroy, generate, init, manifest, reset diff --git a/pyproject.toml b/pyproject.toml index 971161891..ab131104c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "kpops" -version = "5.0.0" +version = "5.0.1" description = "KPOps is a tool to deploy Kafka pipelines to Kubernetes" authors = ["bakdata "] license = "MIT"