Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cleaning StatefulSets with PVCs #482

Merged
merged 10 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/docs/schema/defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,45 @@
"title": "OutputTopicTypes",
"type": "string"
},
"PersistenceConfig": {
"description": "streams-bootstrap persistence configurations.\n\n:param enabled: Whether to use a persistent volume to store the state of the streams app.\n:param size: The size of the PersistentVolume to allocate to each streams pod in the StatefulSet.\n:param storage_class: Storage class to use for the persistent volume.",
"properties": {
"enabled": {
"default": false,
"description": "Whether to use a persistent volume to store the state of the streams app.\t",
"title": "Enabled",
"type": "boolean"
},
"size": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The size of the PersistentVolume to allocate to each streams pod in the StatefulSet.",
"title": "Size"
},
"storage_class": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Storage class to use for the persistent volume.",
"title": "Storage Class"
}
},
"title": "PersistenceConfig",
"type": "object"
},
"PipelineComponent": {
"additionalProperties": true,
"description": "Base class for all components.",
Expand Down Expand Up @@ -1248,6 +1287,25 @@
"description": "Helm chart name override, assigned automatically",
"title": "Nameoverride"
},
"persistence": {
"allOf": [
{
"$ref": "#/$defs/PersistenceConfig"
}
],
"default": {
"enabled": false,
"size": null,
"storage_class": null
},
"description": ""
},
"statefulSet": {
"default": false,
"description": "Whether to use a Statefulset instead of a Deployment to deploy the streams app.",
"title": "Statefulset",
"type": "boolean"
},
"streams": {
"allOf": [
{
Expand Down
58 changes: 58 additions & 0 deletions docs/docs/schema/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,45 @@
"title": "OutputTopicTypes",
"type": "string"
},
"PersistenceConfig": {
"description": "streams-bootstrap persistence configurations.\n\n:param enabled: Whether to use a persistent volume to store the state of the streams app.\n:param size: The size of the PersistentVolume to allocate to each streams pod in the StatefulSet.\n:param storage_class: Storage class to use for the persistent volume.",
"properties": {
"enabled": {
"default": false,
"description": "Whether to use a persistent volume to store the state of the streams app.\t",
"title": "Enabled",
"type": "boolean"
},
"size": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The size of the PersistentVolume to allocate to each streams pod in the StatefulSet.",
"title": "Size"
},
"storage_class": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Storage class to use for the persistent volume.",
"title": "Storage Class"
}
},
"title": "PersistenceConfig",
"type": "object"
},
"ProducerApp": {
"additionalProperties": true,
"description": "Producer component.\nThis producer holds configuration to use as values for the streams-bootstrap producer Helm chart. Note that the producer does not support error topics.",
Expand Down Expand Up @@ -916,6 +955,25 @@
"description": "Helm chart name override, assigned automatically",
"title": "Nameoverride"
},
"persistence": {
"allOf": [
{
"$ref": "#/$defs/PersistenceConfig"
}
],
"default": {
"enabled": false,
"size": null,
"storage_class": null
},
"description": ""
},
"statefulSet": {
"default": false,
"description": "Whether to use a Statefulset instead of a Deployment to deploy the streams app.",
"title": "Statefulset",
"type": "boolean"
},
"streams": {
"allOf": [
{
Expand Down
49 changes: 49 additions & 0 deletions kpops/component_handlers/kubernetes/pvc_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from __future__ import annotations

import logging

from kubernetes_asyncio import client, config
from kubernetes_asyncio.client import ApiClient

log = logging.getLogger("PVC_handler")


class PVCHandler:
def __init__(self, app_name: str, namespace: str):
self.app_name = app_name
self.namespace = namespace

@classmethod
async def create(cls, app_name: str, namespace: str) -> PVCHandler:
self = cls(app_name, namespace)
await config.load_kube_config()
return self

async def list_pvcs(self) -> list[str]:
async with ApiClient() as api:
core_v1_api = client.CoreV1Api(api)
pvc_list = core_v1_api.list_namespaced_persistent_volume_claim(
self.namespace, label_selector=f"app={self.app_name}"
)

pvc_names = [pvc.metadata.name for pvc in pvc_list.items]
if not pvc_names:
log.warning(
f"No PVCs found for app '{self.app_name}', in namespace '{self.namespace}'"
)
log.debug(
f"In namespace '{self.namespace}' StatefulSet '{self.app_name}' has corresponding PVCs: '{pvc_names}'"
)
return pvc_names

async def delete_pvcs(self) -> None:
async with ApiClient() as api:
core_v1_api = client.CoreV1Api(api)
pvc_names = await self.list_pvcs()
log.debug(
f"Deleting in namespace '{self.namespace}' StatefulSet '{self.app_name}' PVCs '{pvc_names}'"
)
for pvc_name in pvc_names:
core_v1_api.delete_namespaced_persistent_volume_claim(
pvc_name, self.namespace
)
44 changes: 43 additions & 1 deletion kpops/components/streams_bootstrap/streams/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any

import pydantic
from pydantic import ConfigDict, Field, model_validator
from pydantic import BaseModel, ConfigDict, Field, model_validator

from kpops.components.base_components.kafka_app import (
KafkaAppValues,
Expand Down Expand Up @@ -203,6 +203,40 @@ def validate_mandatory_fields_are_set(
return self


class PersistenceConfig(BaseModel):
"""streams-bootstrap persistence configurations.

:param enabled: Whether to use a persistent volume to store the state of the streams app.
:param size: The size of the PersistentVolume to allocate to each streams pod in the StatefulSet.
:param storage_class: Storage class to use for the persistent volume.
"""

enabled: bool = Field(
default=False,
description="Whether to use a persistent volume to store the state of the streams app. ",
)
size: str | None = Field(
default=None,
description="The size of the PersistentVolume to allocate to each streams pod in the StatefulSet.",
)
storage_class: str | None = Field(
default=None,
description="Storage class to use for the persistent volume.",
)

@model_validator(mode="after")
def validate_mandatory_fields_are_set(
self: PersistenceConfig,
) -> PersistenceConfig: # TODO: typing.Self for Python 3.11+
if self.enabled and self.size is None:
msg = (
"If app.persistence.enabled is set to true, "
"the field app.persistence.size needs to be set."
)
raise ValidationError(msg)
return self


class StreamsAppValues(KafkaAppValues):
"""streams-bootstrap app configurations.

Expand All @@ -220,4 +254,12 @@ class StreamsAppValues(KafkaAppValues):
default=None,
description=describe_attr("autoscaling", __doc__),
)
stateful_set: bool = Field(
default=False,
description="Whether to use a Statefulset instead of a Deployment to deploy the streams app.",
)
persistence: PersistenceConfig = Field(
default=PersistenceConfig(),
description=describe_attr("persistence", __doc__),
)
model_config = ConfigDict(extra="allow")
28 changes: 27 additions & 1 deletion kpops/components/streams_bootstrap/streams/streams_app.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import logging
from functools import cached_property

from pydantic import Field, computed_field
from typing_extensions import override

from kpops.component_handlers.kubernetes.pvc_handler import PVCHandler
from kpops.components.base_components.kafka_app import (
KafkaApp,
KafkaAppCleaner,
)
from kpops.components.base_components.models.topic import KafkaTopic
from kpops.components.streams_bootstrap import StreamsBootstrap
from kpops.components.streams_bootstrap.app_type import AppType
from kpops.components.streams_bootstrap.streams.model import StreamsAppValues
from kpops.components.streams_bootstrap.streams.model import (
StreamsAppValues,
)
from kpops.utils.docstring import describe_attr

log = logging.getLogger("StreamsApp")


class StreamsAppCleaner(KafkaAppCleaner):
from_: None = None
Expand All @@ -24,6 +30,26 @@ class StreamsAppCleaner(KafkaAppCleaner):
def helm_chart(self) -> str:
return f"{self.repo_config.repository_name}/{AppType.CLEANUP_STREAMS_APP.value}"

@cached_property
def pvc_handler(self) -> PVCHandler:
return PVCHandler(self.full_name, self.namespace)

@override
async def clean(self, dry_run: bool) -> None:
await super().clean(dry_run)
if self.app.stateful_set and self.app.persistence.enabled:
await self.clean_pvcs(dry_run)

async def clean_pvcs(self, dry_run: bool) -> None:
if dry_run:
pvc_names = await self.pvc_handler.list_pvcs()
log.info(
f"Deleting the PVCs {pvc_names} for StatefulSet '{self.full_name}'"
)
else:
log.info(f"Deleting the PVCs for StatefulSet '{self.full_name}'")
await self.pvc_handler.delete_pvcs()


class StreamsApp(KafkaApp, StreamsBootstrap):
"""StreamsApp component that configures a streams-bootstrap app.
Expand Down
Loading
Loading