Skip to content

Commit

Permalink
Extend StreamsBootstrap model (#534)
Browse files Browse the repository at this point in the history
Closes #520
  • Loading branch information
raminqaf authored Oct 25, 2024
1 parent 1db2cd4 commit ffe4072
Show file tree
Hide file tree
Showing 15 changed files with 2,491 additions and 55 deletions.
980 changes: 967 additions & 13 deletions docs/docs/schema/defaults.json

Large diffs are not rendered by default.

788 changes: 787 additions & 1 deletion docs/docs/schema/pipeline.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion kpops/component_handlers/kafka_connect/connect_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def validate_connector_config(
"""Validate connector config using the given configuration.
:param connector_config: Configuration parameters for the connector.
:raises KafkaConnectError: Kafka Konnect error
:raises KafkaConnectError: Kafka Connect error
:return: List of all found errors
"""
response = await self._client.put(
Expand Down
111 changes: 111 additions & 0 deletions kpops/components/common/kubernetes_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import enum

from pydantic import Field

from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import DescConfigModel

# Matches plain integer or numbers with valid suffixes: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
MEMORY_PATTERN = r"^\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$"


class ServiceType(enum.Enum):
"""Represents the different Kubernetes service types.
https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
"""

CLUSTER_IP = "ClusterIP"
NODE_PORT = "NodePort"
LOAD_BALANCER = "LoadBalancer"
EXTERNAL_NAME = "ExternalName"


class ProtocolSchema(enum.Enum):
"""Represents the different Kubernetes protocols.
https://kubernetes.io/docs/reference/networking/service-protocols/
"""

TCP = "TCP"
UDP = "UDP"
SCTP = "SCTP"


class ImagePullPolicy(enum.Enum):
"""Represents the different Kubernetes image pull policies.
https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy
"""

ALWAYS = "Always"
IF_NOT_PRESENT = "IfNotPresent"
NEVER = "Never"


class Operation(enum.Enum):
EXISTS = "Exists"
EQUAL = "Equal"


class Effects(enum.Enum):
NO_EXECUTE = "NoExecute"
NO_SCHEDULE = "NoSchedule"
PREFER_NO_SCHEDULE = "PreferNoSchedule"


class RestartPolicy(enum.Enum):
ALWAYS = "Always"
ON_FAILURE = "OnFailure"
NEVER = "Never"


class Toleration(DescConfigModel):
"""Represents the different Kubernetes tolerations.
https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/
:param key: The key that the toleration applies to.
:param operator: The operator ('Exists' or 'Equal').
:param value: The value to match for the key.
:param effect: The effect to tolerate.
:param toleration_seconds: The duration for which the toleration is valid.
"""

key: str = Field(default=..., description=describe_attr("key", __doc__))

operator: Operation = Field(
default=Operation.EQUAL, description=describe_attr("operator", __doc__)
)

effect: Effects = Field(default=..., description=describe_attr("effect", __doc__))

value: str | None = Field(default=None, description=describe_attr("value", __doc__))

toleration_seconds: int | None = Field(
default=None, description=describe_attr("toleration_seconds", __doc__)
)


class ResourceDefinition(DescConfigModel):
"""Model representing the 'limits' or `request` section of Kubernetes resource specifications.
:param cpu: The maximum amount of CPU a container can use, expressed in milli CPUs (e.g., '300m').
:param memory: The maximum amount of memory a container can use, with valid units such as 'Mi' or 'Gi' (e.g., '2G').
"""

cpu: str | int = Field(pattern=r"^\d+m$", description=describe_attr("cpu", __doc__))
memory: str = Field(
pattern=MEMORY_PATTERN, description=describe_attr("memory", __doc__)
)


class Resources(DescConfigModel):
"""Model representing the resource specifications for a Kubernetes container.
:param requests: The minimum resource requirements for the container.
:param limits: The maximum resource limits for the container.
"""

requests: ResourceDefinition = Field(description=describe_attr("requests", __doc__))
limits: ResourceDefinition = Field(description=describe_attr("limits", __doc__))
192 changes: 192 additions & 0 deletions kpops/components/streams_bootstrap/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
from pydantic import AliasChoices, ConfigDict, Field

from kpops.components.base_components.helm_app import HelmAppValues
from kpops.components.common.kubernetes_model import (
ImagePullPolicy,
ProtocolSchema,
ResourceDefinition,
Resources,
ServiceType,
Toleration,
)
from kpops.components.common.topic import KafkaTopic, KafkaTopicStr
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import (
Expand All @@ -19,23 +27,207 @@
IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$"


class PortConfig(CamelCaseConfigModel, DescConfigModel):
"""Base class for the port configuration of the Kafka Streams application.
:param container_port: Number of the port to expose.
:param name: Services can reference port by name (optional).
:param schema: Protocol for port. Must be UDP, TCP, or SCTP.
:param service_port: Number of the port of the service (optional)
"""

container_port: int = Field(
description=describe_attr("ports", __doc__),
)
name: str | None = Field(
default=None,
description=describe_attr("name", __doc__),
)
schema: ProtocolSchema = Field(
default=ProtocolSchema.TCP,
description=describe_attr("schema", __doc__),
)
service_port: int | None = Field(
default=None,
description=describe_attr("service_port", __doc__),
)


class ServiceConfig(CamelCaseConfigModel, DescConfigModel):
"""Base model for configuring a service for the Kafka Streams application.
:param enabled: Whether to create a service.
:param labels: Additional service labels.
:param type: Service type.
"""

enabled: bool = Field(
default=False,
description=describe_attr("enabled", __doc__),
)
labels: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("labels", __doc__),
)
type: ServiceType = Field(
default=ServiceType.CLUSTER_IP,
description=describe_attr("type", __doc__),
)


class JavaOptions(CamelCaseConfigModel, DescConfigModel):
"""JVM configuration options.
:param max_RAM_percentage: Sets the maximum amount of memory that the JVM may use for the Java heap before applying ergonomics heuristics as a percentage of the maximum amount determined as described in the -XX:MaxRAM option
:param others: List of Java VM options passed to the streams app.
"""

max_RAM_percentage: int = Field(
default=75,
description=describe_attr("max_RAM_percentage", __doc__),
)
others: list[str] = Field(
default_factory=list,
description=describe_attr("others", __doc__),
)


class StreamsBootstrapValues(HelmAppValues):
"""Base value class for all streams bootstrap related components.
:param image: Docker image of the Kafka producer app.
:param image_tag: Docker image tag of the streams-bootstrap app.
:param image_pull_policy: Docker image pull policy.
:param image_pull_secrets: Secrets to be used for private registries.
:param kafka: Kafka configuration for the streams-bootstrap app.
:param resources: See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
:param configuration_env_prefix: Prefix for environment variables to use that should be parsed as command line arguments.
:param command_line: Map of command line arguments passed to the streams app.
:param env: Custom environment variables.
:param secrets: Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.
:param secret_refs: Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.
:param secret_files_refs: Mount existing secrets as volumes
:param files: Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).
:param pod_annotations: Map of custom annotations to attach to the pod spec.
:param pod_labels: Map of custom labels to attach to the pod spec.
:param liveness_probe: See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core
:param readiness_probe: See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core
:param affinity: Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.
:param tolerations: Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.
"""

image: str = Field(
description=describe_attr("image", __doc__),
)

image_tag: str = Field(
default="latest",
pattern=IMAGE_TAG_PATTERN,
description=describe_attr("image_tag", __doc__),
)

image_pull_policy: ImagePullPolicy = Field(
default=ImagePullPolicy.ALWAYS,
description=describe_attr("image_pull_policy", __doc__),
)

image_pull_secrets: list[dict[str, str]] = Field(
default_factory=list,
description=describe_attr("image_pull_secret", __doc__),
)

kafka: KafkaConfig = Field(
description=describe_attr("kafka", __doc__),
)

resources: Resources = Field(
default=Resources(
requests=ResourceDefinition(cpu="100m", memory="500Mi"),
limits=ResourceDefinition(cpu="300m", memory="2G"),
),
description=describe_attr("resources", __doc__),
)

ports: list[PortConfig] = Field(
default_factory=list,
description=describe_attr("ports", __doc__),
)

service: ServiceConfig = Field(
default_factory=ServiceConfig,
description=describe_attr("service", __doc__),
)

configuration_env_prefix: str = Field(
default="APP",
description=describe_attr("configuration_env_prefix", __doc__),
)

command_line: dict[str, str | bool | int] = Field(
default_factory=dict,
description=describe_attr("command_line", __doc__),
)

env: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("env", __doc__),
)

secrets: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("secrets", __doc__),
)

secret_refs: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("secret_refs", __doc__),
)

secret_files_refs: list[str] = Field(
default_factory=list,
description=describe_attr("secret_files_refs", __doc__),
)

files: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("files", __doc__),
)

java_options: JavaOptions = Field(
default_factory=JavaOptions,
description=describe_attr("java_options", __doc__),
)

pod_annotations: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("pod_annotations", __doc__),
)

pod_labels: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("pod_labels", __doc__),
)

liveness_probe: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("liveness_probe", __doc__),
)

readiness_probe: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("readiness_probe", __doc__),
)

affinity: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("affinity", __doc__),
)

tolerations: list[Toleration] = Field(
default_factory=list,
description=describe_attr("tolerations", __doc__),
)


class KafkaConfig(CamelCaseConfigModel, DescConfigModel):
"""Kafka Streams config.
Expand Down
Loading

0 comments on commit ffe4072

Please sign in to comment.